diff --git a/orca-core/orca-core.gradle b/orca-core/orca-core.gradle index e07f2d8a63..d4c8b35783 100644 --- a/orca-core/orca-core.gradle +++ b/orca-core/orca-core.gradle @@ -53,6 +53,8 @@ dependencies { implementation("com.jayway.jsonpath:json-path:2.2.0") implementation("org.yaml:snakeyaml") implementation("org.codehaus.groovy:groovy") + implementation("net.javacrumbs.shedlock:shedlock-spring:4.44.0") + implementation("net.javacrumbs.shedlock:shedlock-provider-jdbc-template:4.44.0") compileOnly("org.projectlombok:lombok") annotationProcessor("org.projectlombok:lombok") diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/config/OrcaConfiguration.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/config/OrcaConfiguration.java index c45cb4fa4f..59d3bc226c 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/config/OrcaConfiguration.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/config/OrcaConfiguration.java @@ -42,6 +42,7 @@ import com.netflix.spinnaker.orca.libdiffs.ComparableLooseVersion; import com.netflix.spinnaker.orca.libdiffs.DefaultComparableLooseVersion; import com.netflix.spinnaker.orca.listeners.*; +import com.netflix.spinnaker.orca.lock.RetriableLock; import com.netflix.spinnaker.orca.pipeline.CompoundExecutionOperator; import com.netflix.spinnaker.orca.pipeline.DefaultStageDefinitionBuilderFactory; import com.netflix.spinnaker.orca.pipeline.ExecutionRunner; @@ -83,6 +84,7 @@ "com.netflix.spinnaker.orca.preprocessors", "com.netflix.spinnaker.orca.telemetry", "com.netflix.spinnaker.orca.notifications.scheduling", + "com.netflix.spinnaker.orca.lock" }) @Import({ PreprocessorConfiguration.class, @@ -261,7 +263,10 @@ public ForceExecutionCancellationCommand forceExecutionCancellationCommand( @Bean public CompoundExecutionOperator compoundExecutionOperator( - ExecutionRepository repository, ExecutionRunner runner, RetrySupport retrySupport) { - return new CompoundExecutionOperator(repository, runner, retrySupport); + ExecutionRepository repository, + ExecutionRunner runner, + RetrySupport retrySupport, + RetriableLock retriableLock) { + return new CompoundExecutionOperator(repository, runner, retrySupport, retriableLock); } } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/lock/ExternalLock.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/lock/ExternalLock.kt new file mode 100644 index 0000000000..bf15fd251e --- /dev/null +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/lock/ExternalLock.kt @@ -0,0 +1,209 @@ +/* + * Copyright 2023 Armory, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.lock + +import com.netflix.spinnaker.kork.lock.LockManager +import com.netflix.spinnaker.kork.lock.LockManager.LockStatus.ACQUIRED +import net.javacrumbs.shedlock.core.LockConfiguration +import net.javacrumbs.shedlock.core.LockProvider +import net.javacrumbs.shedlock.core.SimpleLock +import org.slf4j.LoggerFactory +import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.* +import java.util.concurrent.Callable + +/** + * Postpones the execution of an action until an external lock has been obtained + */ +interface RunOnLockAcquired { + + /** + * Executes an action after lock identified by {@code keyName} was obtained + * + * @param action action to execute once lock is acquired + * @param keyName name of a lock + * + * @return result of attempting to acquire a lock + */ + fun execute(action: Runnable, keyName: String): RunOnLockResult + + /** + * Executes an action after lock identified by {@code keyName} was obtained + * + * @param action action to execute once lock is acquired + * @param keyName name of a lock + * + * @return result of attempting to acquire a lock and result of action execution + */ + fun execute(action: Callable, keyName: String): RunOnLockResult + +} + +/** + * This is a container object that stores the result of attempting to acquire a lock and executing an action if the lock is obtained. + */ +data class RunOnLockResult( + val lockAcquired: Boolean = false, + val actionExecuted: Boolean = false, + val exception: Exception? = null, + val result: R? = null +) + +/** + * Implementation of {@code RunOnLockAcquired}. Delegates the locking attempt to LockProvider. + * Executes action until shedlock has been obtained + */ +class RunOnShedLockAcquired( + private val shedLockProvider: LockProvider +) : RunOnLockAcquired { + + private val log = LoggerFactory.getLogger(javaClass) + override fun execute(action: Runnable, keyName: String): RunOnLockResult { + val lockOpt = this.getLock(keyName) + if (lockOpt.isEmpty) { + log.error("Failed to acquire shedlock for key: {}", keyName) + return RunOnLockResult(lockAcquired = false) + } + + return try { + log.debug("Executing action with a lock for key: {}", keyName) + action.run() + log.debug("Finished action execution with a lock for key: {}", keyName) + RunOnLockResult(lockAcquired = true, actionExecuted = true) + } catch (e: Exception) { + log.error("An exception occurred while executing action with a lock for key: {}", keyName) + RunOnLockResult(lockAcquired = true, exception = e) + } finally { + lockOpt.get().unlock() + log.debug("Released shedlock for key {}", keyName) + } + } + + override fun execute(action: Callable, keyName: String): RunOnLockResult { + val lockOpt = this.getLock(keyName) + if (lockOpt.isEmpty) { + log.error("Failed to acquire shedlock for key: {}", keyName) + return RunOnLockResult(lockAcquired = false) + } + + return try { + log.debug("Executing action with a lock for key: {}", keyName) + val callableResult = action.call() + log.debug("Finished action execution with a lock for key: {}", keyName) + RunOnLockResult(lockAcquired = true, actionExecuted = true, result = callableResult) + + } catch (e: Exception) { + log.error("An exception occurred while executing action with a lock for key: {}", keyName) + RunOnLockResult(lockAcquired = true, exception = e) + } finally { + lockOpt.get().unlock() + log.debug("Released shedlock for key {}", keyName) + } + } + + private fun getLock(keyName: String): Optional { + try { + log.debug("Attempt to acquire shedlock for key: {}", keyName) + return shedLockProvider.lock(LockConfiguration(Instant.now(), keyName, Duration.ofSeconds(1), Duration.ofMillis(200))) + } catch (e: Exception) { + log.error("An exception occurred during an attempt to acquire shedlock for key: {}", keyName) + log.error(e.message) + throw e + } + } + +} + + +/** + * Implementation of {@code RunOnLockAcquired}. Delegates the locking attempt to LockManager. + * Executes action until redis lock has been obtained + */ +class RunOnRedisLockAcquired( + private val lockManager: LockManager +) : RunOnLockAcquired { + + private fun lockOptions(name: String) = LockManager.LockOptions() + .withLockName(name) + .withMaximumLockDuration(Duration.ofSeconds(1L)) + + override fun execute(action: Runnable, keyName: String): RunOnLockResult { + return try { + val acquireLock = lockManager.acquireLock(lockOptions(keyName), action) + if (!acquireLock.lockStatus.equals(ACQUIRED)) { + return RunOnLockResult(lockAcquired = false) + } + + RunOnLockResult( + lockAcquired = true, + actionExecuted = true, + result = acquireLock.onLockAcquiredCallbackResult + ) + } catch (e: Exception) { + RunOnLockResult(lockAcquired = true, exception = e) + } + } + + override fun execute(action: Callable, keyName: String): RunOnLockResult { + return try { + val acquireLock = lockManager.acquireLock(lockOptions(keyName), action) + if (!acquireLock.lockStatus.equals(ACQUIRED)) { + return RunOnLockResult(lockAcquired = false) + } + + RunOnLockResult( + lockAcquired = true, + actionExecuted = true, + result = acquireLock.onLockAcquiredCallbackResult + ) + } catch (e: Exception) { + RunOnLockResult(lockAcquired = true, exception = e) + } + } + +} + +/** + * Implementation of {@code RunOnLockAcquired}. Doesn't try to obtain any lock, executes action right away + */ +class NoOpRunOnLockAcquired : RunOnLockAcquired { + + private val log = LoggerFactory.getLogger(javaClass) + override fun execute(action: Runnable, keyName: String): RunOnLockResult { + return try { + log.debug("Executing action with no locking for key: {}", keyName) + action.run() + log.debug("Execution with no locking for key: {} successful", keyName) + RunOnLockResult(lockAcquired = true, actionExecuted = true) + } catch (e: Exception) { + log.error("An exception was thrown while executing action with no locking for key: {}", keyName) + log.error(e.message) + RunOnLockResult(exception = e) + } + } + + override fun execute(action: Callable, keyName: String): RunOnLockResult { + return try { + RunOnLockResult(lockAcquired = true, actionExecuted = true, result = action.call()) + } catch (e: Exception) { + RunOnLockResult(exception = e) + } + } + +} diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/lock/LockConfig.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/lock/LockConfig.kt new file mode 100644 index 0000000000..e2afefda10 --- /dev/null +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/lock/LockConfig.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2023 Armory, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.lock + +import com.netflix.spinnaker.kork.core.RetrySupport +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import java.util.* + +@Configuration +class LockConfig { + + @Bean + @ConditionalOnMissingBean(RunOnLockAcquired::class) + fun noOpLocking(): RunOnLockAcquired { + return NoOpRunOnLockAcquired() + } + + @Bean + fun retriableLock( + runOnLockAcquired: RunOnLockAcquired, + retrySupport: RetrySupport + ): RetriableLock { + return RetriableLock(runOnLockAcquired, retrySupport) + } + + +} diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/lock/RetriableLock.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/lock/RetriableLock.java new file mode 100644 index 0000000000..3abb2975de --- /dev/null +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/lock/RetriableLock.java @@ -0,0 +1,140 @@ +/* + * Copyright 2023 Armory, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.lock; + +import com.netflix.spinnaker.kork.core.RetrySupport; +import com.netflix.spinnaker.kork.lock.LockManager.LockOptions; +import java.time.Duration; +import java.util.function.Supplier; +import lombok.*; +import lombok.extern.slf4j.Slf4j; + +/** + * This class is a wrapper for the RunOnLockAcquired implementation. If the implementation fails to + * obtain a lock, this class keeps a count of the failed attempts and tries as many times as + * specified by the user. + */ +@Slf4j +public class RetriableLock { + + private final RunOnLockAcquired lock; + private final RetrySupport retrySupport; + + public RetriableLock(RunOnLockAcquired lock, RetrySupport retrySupport) { + this.lock = lock; + this.retrySupport = retrySupport; + } + + /** + * This method blocks the current thread and delegates locking and function execution to + * RunOnLockAcquired. If the lock cannot be acquired, this method will try a number of times + * specified by the user in the {@code options.maxRetries} object. If the lock cannot be obtained + * within the specified number of attempts, either an exception will be thrown (if {@code + * options.throwOnAcquireFailure} is true) or false will be returned. + * + * @param rlOptions aggregates all the parameters that influence the process of obtaining a lock. + * @param action action to execute once lock is acquired + * @return true, if lock was acquired and action was executed correctly; false if not obtained + * @throws FailedToGetLockException if lock was not obtained and {@code + * options.throwOnAcquireFailure} is true + */ + public Boolean lock(RetriableLockOptions rlOptions, Runnable action) { + try { + retrySupport.retry( + new LockAndRun(rlOptions, action, lock), + rlOptions.getMaxRetries(), + rlOptions.getInterval(), + rlOptions.isExponential()); + return true; + + } catch (FailedToGetLockException e) { + log.error( + "Tried {} times to acquire the lock {} and failed.", + rlOptions.maxRetries, + rlOptions.lockName); + if (rlOptions.isThrowOnAcquireFailure()) { + throw e; + } + return false; + } + } + + public static class FailedToGetLockException extends RuntimeException { + public FailedToGetLockException(String lockName) { + super("Failed to acquire lock: " + lockName); + } + } + + @Getter + @AllArgsConstructor + public static class RetriableLockOptions { + private String lockName; + private int maxRetries; + private Duration interval; + private boolean exponential; + private boolean throwOnAcquireFailure; + + public RetriableLockOptions(String lockName) { + this.lockName = lockName; + this.maxRetries = 5; + this.interval = Duration.ofMillis(500); + this.exponential = false; + this.throwOnAcquireFailure = false; + } + } + + /*** + * Wrapper class for Supplier required by the RetrySupplier::retry method + */ + @RequiredArgsConstructor + private static final class LockAndRun implements Supplier { + + private static final Duration MAX_LOCK_DURATION = Duration.ofSeconds(2L); + + private final RetriableLockOptions options; + private final Runnable action; + private final RunOnLockAcquired lockManager; + + /*** + * Method tries to acquire lock via {@code lockManager} and execute an action once lock is acquired, + * Throws {@code FailedToGetLockException} when failed to acquire lock in specified number of times, + * It is up to client to handle the exception. + * + * @return true, when lock was successfully acquired + * @throws FailedToGetLockException when failed to acquire lock in maxRetries times + */ + @Override + public Boolean get() { + var options = + new LockOptions() + .withLockName(this.options.getLockName()) + .withMaximumLockDuration(MAX_LOCK_DURATION); + + var lockName = options.getLockName(); + var response = lockManager.execute(action, lockName); + if (response.getLockAcquired()) { + log.debug("Successfully acquired lock: {}", lockName); + // The result of this method is nowhere used - we need it to satisfy RetrySupport contract + return true; + } else { + // This exception is caught inside the retrySupport.retry method $maxRetries times. + log.debug("Failed to acquired lock: {}", lockName); + throw new FailedToGetLockException(lockName); + } + } + } +} diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java index 335f9c4303..334f31d87e 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java @@ -20,6 +20,8 @@ import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType; import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution; import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution; +import com.netflix.spinnaker.orca.lock.RetriableLock; +import com.netflix.spinnaker.orca.lock.RetriableLock.RetriableLockOptions; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; import com.netflix.spinnaker.security.AuthenticatedRequest; import java.time.Duration; @@ -35,9 +37,10 @@ @Value @NonFinal public class CompoundExecutionOperator { - ExecutionRepository repository; - ExecutionRunner runner; - RetrySupport retrySupport; + private final ExecutionRepository repository; + private final ExecutionRunner runner; + private final RetrySupport retrySupport; + private final RetriableLock retriableLock; public void cancel(ExecutionType executionType, String executionId) { cancel( @@ -48,7 +51,7 @@ public void cancel(ExecutionType executionType, String executionId) { } public void cancel(ExecutionType executionType, String executionId, String user, String reason) { - doInternal( + doInternalWithRetries( (PipelineExecution execution) -> runner.cancel(execution, user, reason), () -> repository.cancel(executionType, executionId, user, reason), "cancel", @@ -64,7 +67,7 @@ public void pause( @Nonnull ExecutionType executionType, @Nonnull String executionId, @Nullable String pausedBy) { - doInternal( + doInternalWithRetries( runner::reschedule, () -> repository.pause(executionType, executionId, pausedBy), "pause", @@ -77,7 +80,7 @@ public void resume( @Nonnull String executionId, @Nullable String user, @Nonnull Boolean ignoreCurrentStatus) { - doInternal( + doInternalWithRetries( runner::unpause, () -> repository.resume(executionType, executionId, user, ignoreCurrentStatus), "resume", @@ -90,8 +93,7 @@ public PipelineExecution updateStage( @Nonnull String executionId, @Nonnull String stageId, @Nonnull Consumer stageUpdater) { - return doInternal( - runner::reschedule, + Runnable repositoryAction = () -> { PipelineExecution execution = repository.retrieve(executionType, executionId); StageExecution stage = execution.stageById(stageId); @@ -100,10 +102,9 @@ public PipelineExecution updateStage( stageUpdater.accept(stage); repository.storeStage(stage); - }, - "reschedule", - executionType, - executionId); + }; + return doInternalWithLocking( + runner::reschedule, repositoryAction, "reschedule", executionType, executionId, stageId); } public PipelineExecution restartStage(@Nonnull String executionId, @Nonnull String stageId) { @@ -127,7 +128,7 @@ private PipelineExecution doInternal( String executionId) { PipelineExecution toReturn = null; try { - runWithRetries(repositoryAction); + repositoryAction.run(); toReturn = runWithRetries( @@ -154,18 +155,51 @@ private PipelineExecution doInternal( return toReturn; } + private PipelineExecution doInternalWithLocking( + Consumer runnerAction, + Runnable repositoryAction, + String action, + ExecutionType executionType, + String executionId, + String stageId) { + var runnable = EnhancedExecution.withLocking(retriableLock, stageId, repositoryAction); + return doInternal(runnerAction, runnable, action, executionType, executionId); + } + + private PipelineExecution doInternalWithRetries( + Consumer runnerAction, + Runnable repositoryAction, + String action, + ExecutionType executionType, + String executionId) { + var runnable = EnhancedExecution.withRetries(retrySupport, repositoryAction); + return doInternal(runnerAction, runnable, action, executionType, executionId); + } + private T runWithRetries(Supplier action) { return retrySupport.retry(action, 5, Duration.ofMillis(100), false); } - private void runWithRetries(Runnable action) { - retrySupport.retry( - () -> { - action.run(); - return true; - }, - 5, - Duration.ofMillis(100), - false); + private static final class EnhancedExecution { + + static Runnable withLocking(RetriableLock lock, String lockName, Runnable action) { + return () -> { + var options = new RetriableLockOptions(lockName); + var lockAcquired = lock.lock(options, action); + if (!lockAcquired) { + log.error("Failed to acquire lock {} in {} tries", lockName, options.getMaxRetries()); + throw new RuntimeException("Failed to acquire lock for key: " + lockName); + } + }; + } + + static Runnable withRetries(RetrySupport retrySupport, Runnable action) { + Supplier actionSupplier = + () -> { + action.run(); + return true; + }; + return () -> retrySupport.retry(actionSupplier, 5, Duration.ofMillis(100), false); + } } } diff --git a/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperatorSpec.groovy b/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperatorSpec.groovy index 21338df4d8..30d5fc1387 100644 --- a/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperatorSpec.groovy +++ b/orca-core/src/test/groovy/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperatorSpec.groovy @@ -19,6 +19,7 @@ package com.netflix.spinnaker.orca.pipeline import com.netflix.spinnaker.kork.core.RetrySupport import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution +import com.netflix.spinnaker.orca.lock.RetriableLock import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import org.slf4j.MDC @@ -33,6 +34,8 @@ import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPEL class CompoundExecutionOperatorSpec extends Specification { ExecutionRepository repository = Mock(ExecutionRepository) ExecutionRunner runner = Mock(ExecutionRunner) + RetrySupport retrySupport = new RetrySupport() + RetriableLock retriableLock = Mock(RetriableLock) def execution = Mock(PipelineExecution) def stage = Mock(StageExecution) @@ -41,7 +44,7 @@ class CompoundExecutionOperatorSpec extends Specification { } @Subject - CompoundExecutionOperator operator = new CompoundExecutionOperator(repository, runner, new RetrySupport()) + CompoundExecutionOperator operator = new CompoundExecutionOperator(repository, runner, retrySupport, retriableLock) @Unroll def '#method call should not push messages on the queue for foreign executions'() { @@ -57,6 +60,13 @@ class CompoundExecutionOperatorSpec extends Specification { 1 * repository.handlesPartition("foreign") >> false 1 * repository."$repoMethod"(*_) 0 * runner._(*_) + if("$method" == "updateStage"){ + 1 * retriableLock.lock(_, _) >> { arguments -> + def runnable = (Runnable) arguments[1] + runnable.run() + return true + } + } where: method | repoMethod | args @@ -117,9 +127,28 @@ class CompoundExecutionOperatorSpec extends Specification { { it.setLastModified(new StageExecution.LastModifiedDetails(user: 'user')) }) then: + 1 * retriableLock.lock(_, _) >> { arguments -> + def runnable = (Runnable) arguments[1] + runnable.run() + } _ * repository.retrieve(PIPELINE, 'id') >> execution 1 * execution.stageById('stageId') >> stage 1 * repository.storeStage(stage) stage.getLastModified().getUser() == 'user' } + + def 'stage is not updated when lock cannot be acquired'(){ + given: + StageExecutionImpl stage = new StageExecutionImpl(id: 'stageId') + + when: + operator.updateStage(PIPELINE, 'id', 'stageId', + { it.setLastModified(new StageExecution.LastModifiedDetails(user: 'user')) }) + + then: + (1.._) * retriableLock.lock(_, _) >> false + 0 * repository.retrieve(PIPELINE, 'id') >> execution + 0 * execution.stageById('stageId') >> stage + 0 * repository.storeStage(stage) + } } diff --git a/orca-core/src/test/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperatorTest.java b/orca-core/src/test/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperatorTest.java new file mode 100644 index 0000000000..46322a41bf --- /dev/null +++ b/orca-core/src/test/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperatorTest.java @@ -0,0 +1,181 @@ +/* + * Copyright 2020 Google, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License") + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.pipeline; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.netflix.spinnaker.kork.core.RetrySupport; +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType; +import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution; +import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution; +import com.netflix.spinnaker.orca.lock.RetriableLock; +import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl; +import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl; +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +@RunWith(JUnitPlatform.class) +final class CompoundExecutionOperatorTest { + private static final String APPLICATION = "myapp"; + private static final String PIPELINE = "mypipeline"; + private static final String EXECUTION_ID = "EXECUTION_ID"; + private static final String STAGE_ID = "STAGE_ID"; + private final ExecutionRepository repository = mock(ExecutionRepository.class); + private final ExecutionRunner runner = mock(ExecutionRunner.class); + private final RetrySupport retrySupport = mock(RetrySupport.class); + + private final RetriableLock retriableLock = mock(RetriableLock.class); + private CompoundExecutionOperator executionOperator = + new CompoundExecutionOperator(repository, runner, retrySupport, retriableLock); + private PipelineExecution execution = + new PipelineExecutionImpl(ExecutionType.PIPELINE, EXECUTION_ID, APPLICATION); + + @Test + void restartStageWithValidExpression() { + + execution = buildExpectedExecution(execution, true); + String expression = "'${( #stage(\"Jenkins1\")[\"status\"] matches ''SUCCEEDED|SKIPPED'')"; + Map expectedRestartDetails = buildExpectedRestartDetailsMap(expression); + + List expectedStageList = (List) expectedRestartDetails.get("stages"); + Map expectedStageMap = (Map) expectedStageList.get(0); + + when(repository.retrieve(any(), anyString())).thenReturn(execution); + when(repository.handlesPartition(execution.getPartition())).thenReturn(true); + + PipelineExecution updatedExecution = + executionOperator.restartStage(EXECUTION_ID, STAGE_ID, expectedRestartDetails); + + List updatedPreconditionsList = + (List) updatedExecution.getStages().get(0).getContext().get("preconditions"); + Map updatedContextMap = (Map) updatedPreconditionsList.get(0).get("context"); + + assertEquals(expression, updatedContextMap.get("expression")); + assertEquals(expectedStageMap.get("type"), updatedExecution.getStages().get(0).getType()); + assertEquals(APPLICATION, updatedExecution.getApplication()); + } + + @Test + void restartStageWithNoPreconditions() { + + execution = buildExpectedExecution(execution, false); + Map expectedRestartDetails = buildExpectedRestartDetailsMap(null); + + List expectedStageList = (List) expectedRestartDetails.get("stages"); + Map expectedStageMap = (Map) expectedStageList.get(0); + + when(repository.retrieve(any(), anyString())).thenReturn(execution); + when(repository.handlesPartition(execution.getPartition())).thenReturn(true); + + PipelineExecution updatedExecution = + executionOperator.restartStage(EXECUTION_ID, STAGE_ID, expectedRestartDetails); + + Map updatedPreconditionsList = updatedExecution.getStages().get(0).getContext(); + + assertEquals(null, updatedPreconditionsList.get("preconditions")); + assertEquals(expectedStageMap.get("type"), updatedExecution.getStages().get(0).getType()); + assertEquals(APPLICATION, updatedExecution.getApplication()); + } + + private PipelineExecution buildExpectedExecution( + PipelineExecution execution, boolean expression) { + if (expression) { + Map contextMap = new HashMap<>(); + List preconditionList = new ArrayList(); + Map preconditionMap = new HashMap<>(); + Map expressionContextMap = new HashMap<>(); + + expressionContextMap.put("expression", expression); + expressionContextMap.put("failureMessage", "Precondition failed"); + + preconditionMap.put("context", expressionContextMap); + preconditionMap.put("failPipeline", true); + preconditionMap.put("type", "expression"); + + preconditionList.add(preconditionMap); + + contextMap.put("preconditions", preconditionList); + + StageExecution preconditionStage = new StageExecutionImpl(); + preconditionStage.setId(STAGE_ID); + preconditionStage.setType("checkPreconditions"); + preconditionStage.setName("Check Preconditions"); + preconditionStage.setContext(contextMap); + + execution.getStages().add(preconditionStage); + } else { + StageExecution jenkinsStage = new StageExecutionImpl(); + jenkinsStage.setId(STAGE_ID); + jenkinsStage.setType("jenkins"); + jenkinsStage.setName("Jenkins1"); + + execution.getStages().add(jenkinsStage); + } + return execution; + } + + private Map buildExpectedRestartDetailsMap(String expression) { + Map restartDetails = new HashMap<>(); + List pipelineStageList = new ArrayList(); + + if (expression != null) { + Map preconditionStageMap = new HashMap<>(); + List preconditionList = new ArrayList(); + Map preconditionMap = new HashMap<>(); + Map contextMap = new HashMap<>(); + + contextMap.put("expression", expression); + contextMap.put("failureMessage", "Precondition failed"); + + preconditionMap.put("context", contextMap); + preconditionMap.put("failPipeline", true); + preconditionMap.put("type", "expression"); + + preconditionList.add(preconditionMap); + + preconditionStageMap.put("name", "Check Preconditions"); + preconditionStageMap.put("type", "checkPreconditions"); + preconditionStageMap.put("preconditions", preconditionList); + + pipelineStageList.add(preconditionStageMap); + } else { + Map jenkinsStageMap = new HashMap<>(); + jenkinsStageMap.put("name", "Jenkins"); + jenkinsStageMap.put("job", "Jenkins_Job"); + jenkinsStageMap.put("type", "jenkins"); + + pipelineStageList.add(jenkinsStageMap); + } + + restartDetails.put("application", APPLICATION); + restartDetails.put("name", PIPELINE); + restartDetails.put("executionId", EXECUTION_ID); + restartDetails.put("stages", pipelineStageList); + + return restartDetails; + } +} diff --git a/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/NoOpRunOnLockAcquiredTest.kt b/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/NoOpRunOnLockAcquiredTest.kt new file mode 100644 index 0000000000..0825195a12 --- /dev/null +++ b/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/NoOpRunOnLockAcquiredTest.kt @@ -0,0 +1,49 @@ +/* + * Copyright 2023 Armory, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.lock + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.mockito.Mockito.mock +import org.mockito.Mockito.verify +import java.util.concurrent.Callable + +class NoOpRunOnLockAcquiredTest { + + @Test + @DisplayName("no op should execute runnable just once") + fun noOpExecutesRunnable() { + val classUnderTest = NoOpRunOnLockAcquired() + val mockedRunnable = mock(Runnable::class.java) + + classUnderTest.execute(mockedRunnable, "key") + + verify(mockedRunnable).run() + } + + @Test + @DisplayName("no op should execute callable just once") + fun noOpExecutesCallable() { + val classUnderTest = NoOpRunOnLockAcquired() + val mockedCallable = mock(Callable::class.java) + + classUnderTest.execute(mockedCallable, "key") + + verify(mockedCallable).call() + } +} diff --git a/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/RetriableLockTest.java b/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/RetriableLockTest.java new file mode 100644 index 0000000000..52d0afc95b --- /dev/null +++ b/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/RetriableLockTest.java @@ -0,0 +1,86 @@ +/* + * Copyright 2023 Armory, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.lock; + +import static org.mockito.Mockito.*; + +import com.netflix.spinnaker.kork.core.RetrySupport; +import java.time.Duration; +import java.util.UUID; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; + +class RetriableLockTest { + + private static final String LOCK_NAME = UUID.randomUUID().toString(); + private RunOnLockAcquired runOnLockAcquired; + private RetriableLock retriableLock; + + @BeforeEach + void setup() { + this.runOnLockAcquired = mock(RunOnLockAcquired.class); + this.retriableLock = new RetriableLock(runOnLockAcquired, new RetrySupport()); + } + + @Test + @DisplayName("Should attempt to acquire lock as long as max retries is not exceeded") + public void test1() { + givenLockCannotBeAcquiredOnAnyAttempt(); + + var options = + new RetriableLock.RetriableLockOptions(LOCK_NAME, 3, Duration.ofMillis(500), false, true); + + Assertions.assertThrows( + RetriableLock.FailedToGetLockException.class, () -> retriableLock.lock(options, () -> {})); + + assertLockAcquireAttempts(options.getMaxRetries()); + } + + @Test + @DisplayName("Should attempt to acquire lock only once, when the lock is available") + void test2() { + givenLockIsAcquired(); + + var options = new RetriableLock.RetriableLockOptions(LOCK_NAME); + + retriableLock.lock(options, () -> {}); + + assertLockAcquireAttempts(1); + } + + void givenLockCannotBeAcquiredOnAnyAttempt() { + when(runOnLockAcquired.execute(ArgumentMatchers.any(Runnable.class), ArgumentMatchers.any())) + .thenReturn(lockResult(false, false)); + } + + void givenLockIsAcquired() { + when(runOnLockAcquired.execute(any(Runnable.class), ArgumentMatchers.any())) + .thenReturn(lockResult(true, true)); + } + + void assertLockAcquireAttempts(int times) { + verify(runOnLockAcquired, times(times)) + .execute(ArgumentMatchers.any(Runnable.class), ArgumentMatchers.any()); + } + + RunOnLockResult lockResult(boolean lockAcquired, boolean actionExecuted) { + return new RunOnLockResult(lockAcquired, actionExecuted, null, null); + } +} diff --git a/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/RunOnRedisLockAcquiredTest.kt b/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/RunOnRedisLockAcquiredTest.kt new file mode 100644 index 0000000000..1d02545960 --- /dev/null +++ b/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/RunOnRedisLockAcquiredTest.kt @@ -0,0 +1,128 @@ +/* + * Copyright 2023 Armory, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.lock + +import com.netflix.spinnaker.kork.lock.LockManager +import com.netflix.spinnaker.kork.lock.LockManager.AcquireLockResponse +import com.netflix.spinnaker.kork.lock.LockManager.LockOptions +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.mockito.Mockito.* +import java.util.concurrent.Callable + +class RunOnRedisLockAcquiredTest { + + private val lockManager: LockManager = mock(LockManager::class.java) + private val runnable: Runnable = mock(Runnable::class.java) + private val callable: Callable = mock(Callable::class.java) as Callable + + @AfterEach + fun cleanup() { + reset(lockManager, runnable, callable) + } + + @BeforeEach + fun setup(){ + `when`(callable.call()).thenReturn(true) + } + + @Test + @DisplayName("should return callable result, when lock successfully acquired") + fun test1() { + givenLockIsAcquired() + + val classUnderTest = RunOnRedisLockAcquired(lockManager) + val result = classUnderTest.execute(callable, "key") + + assertTrue(result.lockAcquired) + assertTrue(result.actionExecuted) + assertNull(result.exception) + verify(callable).call() + assertEquals(result.result, this.callable.call()) + } + + @Test + @DisplayName("should run runnable, when lock successfully acquired") + fun test2() { + givenLockIsAcquired() + + val classUnderTest = RunOnRedisLockAcquired(lockManager) + val result = classUnderTest.execute(runnable, "key") + + assertTrue(result.lockAcquired) + assertTrue(result.actionExecuted) + assertNull(result.exception) + assertNull(result.result) + verify(runnable).run() + } + + @Test + @DisplayName("should not call callable, when lock was not acquired") + fun test3(){ + givenLockWasNotAcquired() + + val classUnderTest = RunOnRedisLockAcquired(lockManager) + val result = classUnderTest.execute(callable, "key") + + assertFalse(result.lockAcquired) + assertFalse(result.actionExecuted) + assertNull(result.exception) + assertNull(result.result); + verify(callable, never()).call() + } + + @Test + @DisplayName("should not call runnable, when lock was not acquired") + fun test4(){ + givenLockWasNotAcquired() + + val classUnderTest = RunOnRedisLockAcquired(lockManager) + val result = classUnderTest.execute(callable, "key") + + assertFalse(result.lockAcquired) + assertFalse(result.actionExecuted) + assertNull(result.exception) + assertNull(result.result); + verify(runnable, never()).run() + } + + private fun givenLockIsAcquired() { + `when`(lockManager.acquireLock(any(LockOptions::class.java), any(Callable::class.java))).thenAnswer { + val call = callable.call() + AcquireLockResponse(null, call, LockManager.LockStatus.ACQUIRED, null, true) + } + `when`(lockManager.acquireLock(any(LockOptions::class.java), any(Runnable::class.java))).thenAnswer { + runnable.run() + AcquireLockResponse(null, null, LockManager.LockStatus.ACQUIRED, null, true) + } + } + + private fun givenLockWasNotAcquired(){ + `when`(lockManager.acquireLock(any(LockOptions::class.java), any(Callable::class.java))).thenAnswer { + AcquireLockResponse(null, null, LockManager.LockStatus.TAKEN, null, true) + } + `when`(lockManager.acquireLock(any(LockOptions::class.java), any(Runnable::class.java))).thenAnswer { + AcquireLockResponse(null, null, LockManager.LockStatus.TAKEN, null, true) + } + } + + +} + diff --git a/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/RunOnShedLockAcquiredTest.kt b/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/RunOnShedLockAcquiredTest.kt new file mode 100644 index 0000000000..2a0c0346c4 --- /dev/null +++ b/orca-core/src/test/kotlin/com/netflix/spinnaker/orca/lock/RunOnShedLockAcquiredTest.kt @@ -0,0 +1,115 @@ +/* + * Copyright 2023 Armory, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.lock + +import net.javacrumbs.shedlock.core.LockProvider +import net.javacrumbs.shedlock.core.SimpleLock +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.mockito.Mockito +import org.mockito.Mockito.any +import org.mockito.Mockito.`when` +import java.util.* +import java.util.concurrent.Callable + +class RunOnShedLockAcquiredTest { + + private val lockProvider: LockProvider = Mockito.mock(LockProvider::class.java) + private val runnable: Runnable = Mockito.mock(Runnable::class.java) + private val callable: Callable = Mockito.mock(Callable::class.java) as Callable + + @AfterEach + fun cleanup() { + Mockito.reset(lockProvider, runnable, callable) + } + + @BeforeEach + fun setup(){ + `when`(callable.call()).thenReturn(true) + } + + @Test + @DisplayName("should return callable result, when lock successfully acquired") + fun test1() { + givenLockIsAcquired() + + val classUnderTest = RunOnShedLockAcquired(lockProvider) + val result = classUnderTest.execute(callable, "key") + + assertTrue(result.lockAcquired) + assertTrue(result.actionExecuted) + assertNull(result.exception) + Mockito.verify(callable).call() + assertEquals(result.result, this.callable.call()) + } + + @Test + @DisplayName("should run runnable, when lock successfully acquired") + fun test2() { + givenLockIsAcquired() + + val classUnderTest = RunOnShedLockAcquired(lockProvider) + val result = classUnderTest.execute(runnable, "key") + + assertTrue(result.lockAcquired) + assertTrue(result.actionExecuted) + assertNull(result.exception) + assertNull(result.result) + Mockito.verify(runnable).run() + } + + @Test + @DisplayName("should not call callable, when lock was not acquired") + fun test3(){ + givenLockWasNotAcquired() + + val classUnderTest = RunOnShedLockAcquired(lockProvider) + val result = classUnderTest.execute(callable, "key") + + assertFalse(result.lockAcquired) + assertFalse(result.actionExecuted) + assertNull(result.exception) + assertNull(result.result); + Mockito.verify(callable, Mockito.never()).call() + } + + @Test + @DisplayName("should not call runnable, when lock was not acquired") + fun test4(){ + givenLockWasNotAcquired() + + val classUnderTest = RunOnShedLockAcquired(lockProvider) + val result = classUnderTest.execute(callable, "key") + + assertFalse(result.lockAcquired) + assertFalse(result.actionExecuted) + assertNull(result.exception) + assertNull(result.result); + Mockito.verify(runnable, Mockito.never()).run() + } + + private fun givenLockIsAcquired(){ + `when`(lockProvider.lock(any())).thenReturn(Optional.of(SimpleLock { -> println("Unlocking") })) + } + + private fun givenLockWasNotAcquired(){ + `when`(lockProvider.lock(any())).thenReturn(Optional.empty()) + } +} diff --git a/orca-interlink/src/test/groovy/com/netflix/spinnaker/orca/interlink/InterlinkSpec.groovy b/orca-interlink/src/test/groovy/com/netflix/spinnaker/orca/interlink/InterlinkSpec.groovy index 3bfd26a3db..8e4199a1b9 100644 --- a/orca-interlink/src/test/groovy/com/netflix/spinnaker/orca/interlink/InterlinkSpec.groovy +++ b/orca-interlink/src/test/groovy/com/netflix/spinnaker/orca/interlink/InterlinkSpec.groovy @@ -21,6 +21,7 @@ import com.netflix.spinnaker.kork.core.RetrySupport import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution import com.netflix.spinnaker.orca.interlink.events.* +import com.netflix.spinnaker.orca.lock.RetriableLock import com.netflix.spinnaker.orca.pipeline.CompoundExecutionOperator import com.netflix.spinnaker.orca.pipeline.ExecutionRunner import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl @@ -95,22 +96,29 @@ class InterlinkSpec extends Specification { lastModified: stageLastModified) def execution = Mock(PipelineExecution) def repository = Mock(ExecutionRepository) + def lock = Mock(RetriableLock) def executionOperator = new CompoundExecutionOperator( repository, Mock(ExecutionRunner), - new RetrySupport()) + new RetrySupport(), + lock + ) def mapper = new ObjectMapper() and: def event = new PatchStageInterlinkEvent(ORCHESTRATION, "execId", "stageId", - mapper.writeValueAsString( - new StageExecutionImpl(id: 'stageId', context: [overridden: true, new_value: 42], lastModified: eventLastModified) - )).withObjectMapper(mapper) + mapper.writeValueAsString( + new StageExecutionImpl(id: 'stageId', context: [overridden: true, new_value: 42], lastModified: eventLastModified) + )).withObjectMapper(mapper) when: event.applyTo(executionOperator) then: + 1 * lock.lock(_, _) >> { it -> + def runnable = (Runnable) it[1] + runnable.run() + } _ * repository.retrieve(event.executionType, event.executionId) >> execution 1 * execution.stageById('stageId') >> stage 1 * repository.storeStage(stage) diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt index 665717eaed..e215521d09 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt @@ -46,6 +46,8 @@ import com.netflix.spinnaker.orca.exceptions.TimeoutException import com.netflix.spinnaker.orca.ext.beforeStages import com.netflix.spinnaker.orca.ext.failureStatus import com.netflix.spinnaker.orca.ext.isManuallySkipped +import com.netflix.spinnaker.orca.lock.RetriableLock +import com.netflix.spinnaker.orca.lock.RetriableLock.RetriableLockOptions import com.netflix.spinnaker.orca.pipeline.RestrictExecutionDuringTimeWindow import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilderFactory import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository @@ -60,6 +62,9 @@ import com.netflix.spinnaker.orca.time.toDuration import com.netflix.spinnaker.orca.time.toInstant import com.netflix.spinnaker.q.Message import com.netflix.spinnaker.q.Queue +import org.apache.commons.lang3.time.DurationFormatUtils +import org.slf4j.MDC +import org.springframework.stereotype.Component import java.lang.Deprecated import java.time.Clock import java.time.Duration @@ -67,10 +72,19 @@ import java.time.Duration.ZERO import java.time.Instant import java.time.temporal.TemporalAmount import java.util.concurrent.TimeUnit +import kotlin.Exception +import kotlin.IllegalStateException +import kotlin.Int +import kotlin.Long +import kotlin.String +import kotlin.TODO +import kotlin.Unit +import kotlin.also import kotlin.collections.set -import org.apache.commons.lang3.time.DurationFormatUtils -import org.slf4j.MDC -import org.springframework.stereotype.Component +import kotlin.let +import kotlin.run +import kotlin.to +import kotlin.toString @Component class RunTaskHandler( @@ -84,7 +98,8 @@ class RunTaskHandler( private val exceptionHandlers: List, private val taskExecutionInterceptors: List, private val registry: Registry, - private val dynamicConfigService: DynamicConfigService + private val dynamicConfigService: DynamicConfigService, + private val retriableLock: RetriableLock, ) : OrcaMessageHandler, ExpressionAware, AuthenticationAware { /** @@ -99,89 +114,135 @@ class RunTaskHandler( ) override fun handle(message: RunTask) { - message.withTask { origStage, taskModel, task -> - var stage = origStage - - stage.withAuth { - stage.withLoggingContext(taskModel) { - if (task.javaClass.isAnnotationPresent(Deprecated::class.java)) { - log.warn("deprecated-task-run ${task.javaClass.simpleName}") - } - val thisInvocationStartTimeMs = clock.millis() - val execution = stage.execution - var taskResult: TaskResult? = null - var taskException: Exception? = null - try { - taskExecutionInterceptors.forEach { t -> stage = t.beforeTaskExecution(task, stage) } + message.withLocking { + message.withTask { origStage, taskModel, task -> + var stage = origStage + stage.withAuth { + stage.withLoggingContext(taskModel) { + if (task.javaClass.isAnnotationPresent(Deprecated::class.java)) { + log.warn("deprecated-task-run ${task.javaClass.simpleName}") + } + val thisInvocationStartTimeMs = clock.millis() + val execution = stage.execution + var taskResult: TaskResult? = null - if (execution.isCanceled) { - task.onCancelWithResult(stage)?.run { - stage.processTaskOutput(this) - } - queue.push(CompleteTask(message, CANCELED)) - } else if (execution.status.isComplete) { - queue.push(CompleteTask(message, CANCELED)) - } else if (execution.status == PAUSED) { - queue.push(PauseTask(message)) - } else if (stage.isManuallySkipped()) { - queue.push(CompleteTask(message, SKIPPED)) - } else { - try { - task.checkForTimeout(stage, taskModel, message) - } catch (e: TimeoutException) { - registry - .timeoutCounter(stage.execution.type, stage.execution.application, stage.type, taskModel.name) - .increment() - taskResult = task.onTimeout(stage) + var taskException: Exception? = null + try { + taskExecutionInterceptors.forEach { t -> stage = t.beforeTaskExecution(task, stage) } - if (taskResult == null) { - // This means this task doesn't care to alter the timeout flow, just throw - throw e + if (execution.isCanceled) { + task.onCancelWithResult(stage)?.run { + stage.processTaskOutput(this) } + queue.push(CompleteTask(message, CANCELED)) + } else if (execution.status.isComplete) { + queue.push(CompleteTask(message, CANCELED)) + } else if (execution.status == PAUSED) { + queue.push(PauseTask(message)) + } else if (stage.isManuallySkipped()) { + queue.push(CompleteTask(message, SKIPPED)) + } else { + try { + task.checkForTimeout(stage, taskModel, message) + } catch (e: TimeoutException) { + registry + .timeoutCounter(stage.execution.type, stage.execution.application, stage.type, taskModel.name) + .increment() + taskResult = task.onTimeout(stage) + + if (taskResult == null) { + // This means this task doesn't care to alter the timeout flow, just throw + throw e + } - if (!setOf(TERMINAL, FAILED_CONTINUE).contains(taskResult.status)) { - log.error("Task ${task.javaClass.name} returned invalid status (${taskResult.status}) for onTimeout") - throw e + if (!setOf(TERMINAL, FAILED_CONTINUE).contains(taskResult.status)) { + log.error("Task ${task.javaClass.name} returned invalid status (${taskResult.status}) for onTimeout") + throw e + } } - } - if (taskResult == null) { - taskResult = task.execute(stage.withMergedContext()) - taskExecutionInterceptors.forEach { t -> taskResult = t.afterTaskExecution(task, stage, taskResult) } - } + if (taskResult == null) { + taskResult = task.execute(stage.withMergedContext()) + taskExecutionInterceptors.forEach { t -> taskResult = t.afterTaskExecution(task, stage, taskResult) } + } - taskResult!!.let { result: TaskResult -> - when (result.status) { - RUNNING -> { - stage.processTaskOutput(result) - queue.push(message, task.backoffPeriod(taskModel, stage)) - trackResult(stage, thisInvocationStartTimeMs, taskModel, result.status) - } - SUCCEEDED, REDIRECT, SKIPPED, FAILED_CONTINUE, STOPPED -> { - stage.processTaskOutput(result) - queue.push(CompleteTask(message, result.status)) - trackResult(stage, thisInvocationStartTimeMs, taskModel, result.status) - } - CANCELED -> { - stage.processTaskOutput(result.mergeOutputs(task.onCancelWithResult(stage))) - val status = stage.failureStatus(default = result.status) - queue.push(CompleteTask(message, status, result.status)) - trackResult(stage, thisInvocationStartTimeMs, taskModel, status) - } - TERMINAL -> { - stage.processTaskOutput(result) - val status = stage.failureStatus(default = result.status) - queue.push(CompleteTask(message, status, result.status)) - trackResult(stage, thisInvocationStartTimeMs, taskModel, status) + taskResult!!.let { result: TaskResult -> + when (result.status) { + RUNNING -> { + stage.processTaskOutput(result) + queue.push(message, task.backoffPeriod(taskModel, stage)) + trackResult(stage, thisInvocationStartTimeMs, taskModel, result.status) + } + + SUCCEEDED, REDIRECT, SKIPPED, FAILED_CONTINUE, STOPPED -> { + stage.processTaskOutput(result) + queue.push(CompleteTask(message, result.status)) + trackResult(stage, thisInvocationStartTimeMs, taskModel, result.status) + } + + CANCELED -> { + stage.processTaskOutput(result.mergeOutputs(task.onCancelWithResult(stage))) + val status = stage.failureStatus(default = result.status) + queue.push(CompleteTask(message, status, result.status)) + trackResult(stage, thisInvocationStartTimeMs, taskModel, status) + } + + TERMINAL -> { + stage.processTaskOutput(result) + val status = stage.failureStatus(default = result.status) + queue.push(CompleteTask(message, status, result.status)) + trackResult(stage, thisInvocationStartTimeMs, taskModel, status) + } + + else -> { + stage.processTaskOutput(result) + TODO("Unhandled task status ${result.status}") + } } - else -> { - stage.processTaskOutput(result) - TODO("Unhandled task status ${result.status}") + } + } + } catch (e: Exception) { + taskException = e; + val exceptionDetails = exceptionHandlers.shouldRetry(e, taskModel.name) + if (exceptionDetails?.shouldRetry == true) { + log.warn("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]") + queue.push(message, task.backoffPeriod(taskModel, stage)) + trackResult(stage, thisInvocationStartTimeMs, taskModel, RUNNING) + } else if (e is TimeoutException && stage.context["markSuccessfulOnTimeout"] == true) { + trackResult(stage, thisInvocationStartTimeMs, taskModel, SUCCEEDED) + queue.push(CompleteTask(message, SUCCEEDED)) + } else { + if (e !is TimeoutException) { + if (e is UserException) { + log.warn( + "${message.taskType.simpleName} for ${message.executionType}[${message.executionId}] failed, likely due to user error", + e + ) + } else { + log.error( + "Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]", + e + ) } } + val status = stage.failureStatus(default = TERMINAL) + stage.context["exception"] = exceptionDetails + repository.storeStage(stage) + queue.push(CompleteTask(message, status, TERMINAL)) + trackResult(stage, thisInvocationStartTimeMs, taskModel, status) + } + } finally { + taskExecutionInterceptors.forEach { t -> + t.finallyAfterTaskExecution( + task, + stage, + taskResult, + taskException + ) } } +<<<<<<< HEAD } catch (e: Exception) { taskException = e; val exceptionDetails = exceptionHandlers.shouldRetry(e, taskModel.name) @@ -208,6 +269,8 @@ class RunTaskHandler( } } finally { taskExecutionInterceptors.forEach { t -> t.finallyAfterTaskExecution(task, stage, taskResult, taskException) } +======= +>>>>>>> 706ffb425 (Fix/manual judgment concurrent execution (#4410)) } } } @@ -257,6 +320,15 @@ class RunTaskHandler( } } + private fun RunTask.withLocking(action: Runnable) { + var lockOptions = RetriableLockOptions(this.stageId) + val lockAcquired = retriableLock.lock(lockOptions, action) + if(!lockAcquired){ + log.warn("Failed to obtain lock for stage: {}. Pushing original message back to queue"); + queue.push(this) + } + } + private fun Task.backoffPeriod(taskModel: TaskExecution, stage: StageExecution): TemporalAmount = when (this) { is RetryableTask -> Duration.ofMillis( diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt index 252aa8f63f..06eb147f2f 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt @@ -19,18 +19,11 @@ package com.netflix.spinnaker.orca.q.handler import com.netflix.spectator.api.NoopRegistry import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService import com.netflix.spinnaker.orca.DefaultStageResolver -import com.netflix.spinnaker.orca.api.pipeline.TaskExecutionInterceptor import com.netflix.spinnaker.orca.TaskResolver import com.netflix.spinnaker.orca.api.pipeline.Task +import com.netflix.spinnaker.orca.api.pipeline.TaskExecutionInterceptor import com.netflix.spinnaker.orca.api.pipeline.TaskResult -import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.CANCELED -import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.FAILED_CONTINUE -import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.PAUSED -import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.RUNNING -import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.SKIPPED -import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.STOPPED -import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.SUCCEEDED -import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.TERMINAL +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.* import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution.PausedDetails import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution @@ -38,6 +31,7 @@ import com.netflix.spinnaker.orca.api.test.pipeline import com.netflix.spinnaker.orca.api.test.stage import com.netflix.spinnaker.orca.api.test.task import com.netflix.spinnaker.orca.exceptions.ExceptionHandler +import com.netflix.spinnaker.orca.lock.RetriableLock import com.netflix.spinnaker.orca.pipeline.DefaultStageDefinitionBuilderFactory import com.netflix.spinnaker.orca.pipeline.RestrictExecutionDuringTimeWindow import com.netflix.spinnaker.orca.pipeline.model.DefaultTrigger @@ -46,6 +40,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor import com.netflix.spinnaker.orca.pipeline.util.StageNavigator +<<<<<<< HEAD import com.netflix.spinnaker.orca.q.CompleteTask import com.netflix.spinnaker.orca.q.DummyCloudProviderAwareTask import com.netflix.spinnaker.orca.q.DummyTask @@ -57,29 +52,12 @@ import com.netflix.spinnaker.orca.q.RunTask import com.netflix.spinnaker.orca.q.StageDefinitionBuildersProvider import com.netflix.spinnaker.orca.q.TasksProvider import com.netflix.spinnaker.orca.q.singleTaskStage +======= +import com.netflix.spinnaker.orca.q.* +>>>>>>> 706ffb425 (Fix/manual judgment concurrent execution (#4410)) import com.netflix.spinnaker.q.Queue import com.netflix.spinnaker.spek.and -import com.nhaarman.mockito_kotlin.any -import com.nhaarman.mockito_kotlin.anyOrNull -import com.nhaarman.mockito_kotlin.check -import com.nhaarman.mockito_kotlin.doReturn -import com.nhaarman.mockito_kotlin.doThrow -import com.nhaarman.mockito_kotlin.eq -import com.nhaarman.mockito_kotlin.isA -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.never -import com.nhaarman.mockito_kotlin.reset -import com.nhaarman.mockito_kotlin.times -import com.nhaarman.mockito_kotlin.verify -import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions -import com.nhaarman.mockito_kotlin.whenever -import java.time.Clock -import java.time.Duration -import java.time.Instant -import java.time.ZoneId -import java.time.temporal.ChronoUnit -import kotlin.collections.set -import kotlin.reflect.jvm.jvmName +import com.nhaarman.mockito_kotlin.* import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given @@ -87,7 +65,18 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.jetbrains.spek.api.lifecycle.CachingMode.GROUP import org.jetbrains.spek.subject.SubjectSpek +<<<<<<< HEAD +======= +import org.mockito.stubbing.Answer +>>>>>>> 706ffb425 (Fix/manual judgment concurrent execution (#4410)) import org.threeten.extra.Minutes +import java.time.Clock +import java.time.Duration +import java.time.Instant +import java.time.ZoneId +import java.time.temporal.ChronoUnit +import kotlin.collections.set +import kotlin.reflect.jvm.jvmName object RunTaskHandlerTest : SubjectSpek({ @@ -113,6 +102,7 @@ object RunTaskHandlerTest : SubjectSpek({ val clock = Clock.fixed(Instant.now().truncatedTo(ChronoUnit.MILLIS), ZoneId.systemDefault()) val contextParameterProcessor = ContextParameterProcessor() val dynamicConfigService: DynamicConfigService = mock() + val retriableLock: RetriableLock = mock() val taskExecutionInterceptors: List = listOf(mock()) val stageResolver = DefaultStageResolver(StageDefinitionBuildersProvider(emptyList())) @@ -130,11 +120,12 @@ object RunTaskHandlerTest : SubjectSpek({ listOf(exceptionHandler), taskExecutionInterceptors, NoopRegistry(), - dynamicConfigService + dynamicConfigService, + retriableLock ) } - fun resetMocks() = reset(queue, repository, task, timeoutOverrideTask, cloudProviderAwareTask, exceptionHandler) + fun resetMocks() = reset(queue, repository, task, timeoutOverrideTask, cloudProviderAwareTask, exceptionHandler, retriableLock) describe("running a task") { @@ -160,6 +151,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -195,6 +187,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -222,6 +215,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -252,6 +246,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -295,6 +290,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -330,6 +326,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -357,6 +354,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -387,6 +385,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -432,6 +431,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(task.getDynamicBackoffPeriod(any(), any())) doReturn taskBackoffMs whenever(dynamicConfigService.getConfig(eq(Long::class.java), eq("tasks.global.backOffPeriod"), any())) doReturn 0L whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -468,6 +468,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -494,6 +495,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -521,6 +523,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -573,6 +576,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -610,6 +614,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -640,6 +645,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -675,6 +681,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -707,6 +714,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -755,6 +763,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -801,6 +810,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -841,6 +851,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -880,6 +891,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -914,6 +926,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -955,6 +968,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -992,6 +1006,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1033,6 +1048,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1082,6 +1098,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1125,6 +1142,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1165,6 +1183,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() whenever(task.onTimeout(any())) doReturn TaskResult.ofStatus(FAILED_CONTINUE) + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1189,6 +1208,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() whenever(task.onTimeout(any())) doReturn TaskResult.ofStatus(TERMINAL) + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1212,6 +1232,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() whenever(task.onTimeout(any())) doReturn TaskResult.ofStatus(SUCCEEDED) + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1260,6 +1281,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1297,6 +1319,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1341,6 +1364,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1377,6 +1401,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1417,6 +1442,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1459,6 +1485,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(timeoutOverrideTask, stage, taskResult)) doReturn taskResult } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline whenever(timeoutOverrideTask.timeout) doReturn timeout.toMillis() + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1507,6 +1534,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1554,6 +1582,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1610,6 +1639,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1652,6 +1682,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1696,6 +1727,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1729,6 +1761,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1769,6 +1802,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + setupRetriableLock(true, retriableLock) } afterGroup(::resetMocks) @@ -1823,7 +1857,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(cloudProviderAwareTask.hasCredentials(any())) doReturn true whenever(cloudProviderAwareTask.getCredentials(any())) doReturn "someAccount" whenever(dynamicConfigService.getConfig(eq(Long::class.java), eq("tasks.aws.someAccount.backOffPeriod"), any())) doReturn backOff.accountBackOffMs - + setupRetriableLock(true, retriableLock) whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline } @@ -1839,8 +1873,57 @@ object RunTaskHandlerTest : SubjectSpek({ } } } +<<<<<<< HEAD +======= + + describe("locking stage before handing a task") { + given("lock is already taken and thread cannot acquire it") { + + val pipeline = pipeline { + stage { + type = "whatever" + task { + id = "1" + implementingClass = RunTask::class.jvmName + startTime = clock.instant().toEpochMilli() + } + } + } + val stage = pipeline.stages.first() + val message = RunTask(pipeline.type, pipeline.id, "foo", stage.id, "1", DummyCloudProviderAwareTask::class.java) + + beforeGroup{ + tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } + setupRetriableLock(false, retriableLock) + } + + afterGroup(::resetMocks) + + on("the handler receives a message") { + subject.handle(message) + + it("pushes original message back to queue for processing") { + verify(queue).push(message) + } + } + } + } +>>>>>>> 706ffb425 (Fix/manual judgment concurrent execution (#4410)) }) +fun setupRetriableLock(acquireLock: Boolean, lock: RetriableLock){ + if(acquireLock){ + val runnableCaptor = argumentCaptor() + val answer: Answer = Answer { + runnableCaptor.firstValue.run() + true + } + whenever(lock.lock(any(), runnableCaptor.capture())).thenAnswer(answer) + } else { + whenever(lock.lock(any(), any())).thenReturn(false); + } +} + data class BackOff( val taskBackOffMs: Long, val globalBackOffMs: Long, diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java index 2beea47f2e..4d9765aff1 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/config/RedisConfiguration.java @@ -15,17 +15,27 @@ */ package com.netflix.spinnaker.orca.config; +import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.spectator.api.Registry; import com.netflix.spinnaker.kork.jedis.JedisClientConfiguration; import com.netflix.spinnaker.kork.jedis.RedisClientSelector; +import com.netflix.spinnaker.kork.jedis.lock.RedisLockManager; +import com.netflix.spinnaker.kork.lock.LockManager; import com.netflix.spinnaker.kork.telemetry.InstrumentedProxy; +import com.netflix.spinnaker.orca.lock.RunOnLockAcquired; +import com.netflix.spinnaker.orca.lock.RunOnRedisLockAcquired; import com.netflix.spinnaker.orca.notifications.NotificationClusterLock; import com.netflix.spinnaker.orca.notifications.RedisClusterNotificationClusterLock; import com.netflix.spinnaker.orca.notifications.RedisNotificationClusterLock; import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; import com.netflix.spinnaker.orca.pipeline.persistence.jedis.RedisExecutionRepository; +import groovy.util.logging.Slf4j; +import java.time.Clock; import java.util.Collections; +import java.util.Optional; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -36,11 +46,14 @@ import redis.clients.jedis.JedisPoolConfig; import rx.Scheduler; +@Slf4j @Configuration @ConditionalOnProperty(value = "redis.enabled", matchIfMissing = true) @Import({JedisClientConfiguration.class, JedisConfiguration.class}) public class RedisConfiguration { + private static final Logger log = LoggerFactory.getLogger(RedisConfiguration.class); + public static class Clients { public static final String EXECUTION_REPOSITORY = "executionRepository"; public static final String TASK_QUEUE = "taskQueue"; @@ -94,4 +107,29 @@ public GenericObjectPoolConfig redisPoolConfig() { config.setMinIdle(25); return config; } + + @Bean + @Primary + @ConditionalOnProperty(value = "redis.external-lock.enabled") + public RunOnLockAcquired redisRunOnLockAcquired(LockManager lockManager) { + log.info("Redis distributed locking enabled"); + return new RunOnRedisLockAcquired(lockManager); + } + + @Bean + @ConditionalOnProperty(value = "redis.external-lock.enabled") + public LockManager lockManager( + Clock clock, + Registry registry, + ObjectMapper mapper, + RedisClientSelector redisClientSelector) { + return new RedisLockManager( + "RedisLockManager", + clock, + registry, + mapper, + redisClientSelector.primary("executionRepository"), + Optional.empty(), + Optional.empty()); + } } diff --git a/orca-sql/orca-sql.gradle b/orca-sql/orca-sql.gradle index c7fe04c057..5f3c2d76f0 100644 --- a/orca-sql/orca-sql.gradle +++ b/orca-sql/orca-sql.gradle @@ -36,6 +36,8 @@ dependencies { implementation("org.liquibase:liquibase-core") implementation("com.zaxxer:HikariCP") implementation("com.fasterxml.jackson.module:jackson-module-kotlin") + implementation("net.javacrumbs.shedlock:shedlock-spring:4.44.0") + implementation("net.javacrumbs.shedlock:shedlock-provider-jdbc-template:4.44.0") testImplementation(project(":orca-core-tck")) testImplementation(project(":orca-test-groovy")) diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt index 6bc15e32f1..d6ae9f9ea0 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt @@ -23,6 +23,8 @@ import com.netflix.spinnaker.kork.sql.config.SqlProperties import com.netflix.spinnaker.kork.telemetry.InstrumentedProxy import com.netflix.spinnaker.orca.api.pipeline.persistence.ExecutionRepositoryListener import com.netflix.spinnaker.orca.interlink.Interlink +import com.netflix.spinnaker.orca.lock.RunOnLockAcquired +import com.netflix.spinnaker.orca.lock.RunOnShedLockAcquired import com.netflix.spinnaker.orca.notifications.NotificationClusterLock import com.netflix.spinnaker.orca.notifications.SqlNotificationClusterLock import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository @@ -32,20 +34,21 @@ import com.netflix.spinnaker.orca.sql.SqlHealthcheckActivator import com.netflix.spinnaker.orca.sql.pipeline.persistence.ExecutionStatisticsRepository import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository import com.netflix.spinnaker.orca.sql.telemetry.SqlActiveExecutionsMonitor -import java.time.Clock -import java.util.Optional import liquibase.integration.spring.SpringLiquibase +import net.javacrumbs.shedlock.core.LockProvider +import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider import org.jooq.DSLContext +import org.slf4j.Logger +import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.ComponentScan -import org.springframework.context.annotation.Configuration -import org.springframework.context.annotation.Import -import org.springframework.context.annotation.Primary +import org.springframework.context.annotation.* +import java.time.Clock +import java.util.* +import javax.sql.DataSource @Configuration @ConditionalOnProperty("sql.enabled") @@ -54,6 +57,8 @@ import org.springframework.context.annotation.Primary @ComponentScan("com.netflix.spinnaker.orca.sql") class SqlConfiguration { + + private val log: Logger = LoggerFactory.getLogger(SqlConfiguration::class.java) @Bean fun liquibase(properties: SqlProperties): SpringLiquibase = SpringLiquibaseProxy(properties) @@ -138,4 +143,18 @@ class SqlConfiguration { clock = clock, retryProperties = properties.retries.transactions ) + + + @Bean + @ConditionalOnProperty("sql.external-lock.enabled") + fun sqlRunOnLockAcquired(lockProvider: LockProvider): RunOnLockAcquired { + log.info("SQL distributed locking enabled") + return RunOnShedLockAcquired(lockProvider) + } + + @Bean + @ConditionalOnProperty("sql.external-lock.enabled") + fun lockProvider(datasource: DataSource): LockProvider { + return JdbcTemplateLockProvider(datasource) + } } diff --git a/orca-sql/src/main/resources/db/changelog-master.yml b/orca-sql/src/main/resources/db/changelog-master.yml index 5bda7e5da2..d1e2c0b01d 100644 --- a/orca-sql/src/main/resources/db/changelog-master.yml +++ b/orca-sql/src/main/resources/db/changelog-master.yml @@ -53,3 +53,6 @@ databaseChangeLog: - include: file: changelog/20200603-deleted-executions-table-not-mysql.yml relativeToChangelogFile: true +- include: + file: changelog/20230309-shedlock.yml + relativeToChangelogFile: true diff --git a/orca-sql/src/main/resources/db/changelog/20230309-shedlock.yml b/orca-sql/src/main/resources/db/changelog/20230309-shedlock.yml new file mode 100644 index 0000000000..62b44e7e50 --- /dev/null +++ b/orca-sql/src/main/resources/db/changelog/20230309-shedlock.yml @@ -0,0 +1,29 @@ +databaseChangeLog: + - changeSet: + id: create-shedlock-table-required-for-sql-based-distributed-locking + author: kkotula + changes: + - createTable: + tableName: shedlock + columns: + - column: + name: name + type: varchar(64) + constraints: + primaryKey: true + nullable: false + - column: + name: lock_until + type: timestamp + constraints: + nullable: true + - column: + name: locked_at + type: timestamp + constraints: + nullable: true + - column: + name: locked_by + type: varchar(255) + constraints: + nullable: false