From 5fa12732a9cc41b479ba06d5a1030ec866da60a1 Mon Sep 17 00:00:00 2001 From: Florian Sellmayr Date: Sun, 16 Oct 2016 13:45:10 +0200 Subject: [PATCH] Compress inherited step-result-update events to prevent filling up the event-bus publisher when lots of step-result updates happen in a short time. Fixes #135) --- CHANGELOG.md | 7 ++++++ src/clj/lambdacd/internal/execution.clj | 22 ++++++++++--------- test/clj/lambdacd/steps/support_test.clj | 27 ++++++++++++++++++++++-- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 801d859a..42fdc267 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ However, as this is still an experimental library, breaking changes may occur wi The official release will have a defined and more stable API. If you are already relying on a particular API, please let me know. +## 0.11.1 +* Bug fixes: + * Fix deadlock occurring when steps write a lot of step-results in quick succession and step results are inherited by their parents (as in chaining) (#135) +* Breaking changes: + * The fix for #135 changes the behavior of step result inheritance by introducing a sliding window that compresses several step result update events into one: Steps inheriting their childens results via the `:unify-status-fn` or `:unify-results-fn` (e.g. chaining steps) might not pass on intermediate update events; the ultimately resulting unified step result will remain the same. + + ## 0.11.0 * Keeps a history of pipeline structure if persistence component supports it (#131, #6) diff --git a/src/clj/lambdacd/internal/execution.clj b/src/clj/lambdacd/internal/execution.clj index 69f8a83c..bcfc376c 100644 --- a/src/clj/lambdacd/internal/execution.clj +++ b/src/clj/lambdacd/internal/execution.clj @@ -161,16 +161,18 @@ (defn- process-inheritance [out-ch step-results-channel unify-results-fn] (async/go - (loop [results {}] - (if-let [{step-id :step-id - step-result :step-result} (async/! out-ch new-unified)) - (recur new-results)) - (async/close! out-ch))))) + (let [dropping-output-ch (async/chan (async/sliding-buffer 1))] + (async/pipe dropping-output-ch out-ch) + (loop [results {}] + (if-let [{step-id :step-id + step-result :step-result} (async/! dropping-output-ch new-unified)) + (recur new-results)) + (async/close! dropping-output-ch)))))) (defn contexts-for-steps "creates contexts for steps" diff --git a/test/clj/lambdacd/steps/support_test.clj b/test/clj/lambdacd/steps/support_test.clj index 2bda8c23..420394df 100644 --- a/test/clj/lambdacd/steps/support_test.clj +++ b/test/clj/lambdacd/steps/support_test.clj @@ -1,10 +1,12 @@ (ns lambdacd.steps.support-test (:require [clojure.test :refer :all] [lambdacd.steps.support :as step-support :refer :all] - [lambdacd.testsupport.test-util :refer [slurp-chan]] + [lambdacd.testsupport.test-util :refer [slurp-chan call-with-timeout]] [lambdacd.testsupport.data :refer [some-ctx some-ctx-with]] [lambdacd.testsupport.matchers :refer [map-containing]] - [clojure.core.async :as async])) + [clojure.core.async :as async] + [lambdacd.testsupport.noop-pipeline-state :as noop-pipeline-state] + [lambdacd.execution :as execution])) (defn some-step [args ctx] {:status :success :foo :bar}) @@ -244,6 +246,27 @@ `(1 42) {:status :failure}} :foo :bar}))))) +(defn output-load-test-ctx [] + (some-ctx-with :pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state))) + +(defn log-lots-of-output [args ctx] + (doall (for [i (range 800)] + (if-not-killed ctx + (async/>!! (:result-channel ctx) [:xyz i])))) + {:status :success}) + +(defn log-lots-of-output-in-chaining [args ctx] + (chaining args ctx + (log-lots-of-output injected-args injected-ctx))) + +(deftest output-stress-test ; reproduces #135 + (testing "that we don't fail if we come across lots of output for just in general" + (is (= :success (:status (call-with-timeout 10000 + (execution/execute-step {} [(output-load-test-ctx) log-lots-of-output])))))) + (testing "that we don't fail if we come across lots of output for chaining" + (is (= :success (:status (call-with-timeout 10000 + (execution/execute-step {} [(output-load-test-ctx) log-lots-of-output-in-chaining]))))))) + (deftest if-not-killed-test (testing "that the body will only be executed if step is still alive" (let [killed-ctx (some-ctx-with :is-killed (atom true))