Skip to content
This repository has been archived by the owner on Jun 15, 2024. It is now read-only.

Commit

Permalink
Compress inherited step-result-update events to prevent filling up th…
Browse files Browse the repository at this point in the history
…e event-bus publisher when lots of step-result updates happen in a short time. Fixes #135)
  • Loading branch information
flosell committed Oct 16, 2016
1 parent a3043db commit 5fa1273
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ However, as this is still an experimental library, breaking changes may occur wi

The official release will have a defined and more stable API. If you are already relying on a particular API, please let me know.

## 0.11.1
* Bug fixes:
* Fix deadlock occurring when steps write a lot of step-results in quick succession and step results are inherited by their parents (as in chaining) (#135)
* Breaking changes:
* The fix for #135 changes the behavior of step result inheritance by introducing a sliding window that compresses several step result update events into one: Steps inheriting their childens results via the `:unify-status-fn` or `:unify-results-fn` (e.g. chaining steps) might not pass on intermediate update events; the ultimately resulting unified step result will remain the same.


## 0.11.0

* Keeps a history of pipeline structure if persistence component supports it (#131, #6)
Expand Down
22 changes: 12 additions & 10 deletions src/clj/lambdacd/internal/execution.clj
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,18 @@

(defn- process-inheritance [out-ch step-results-channel unify-results-fn]
(async/go
(loop [results {}]
(if-let [{step-id :step-id
step-result :step-result} (async/<! step-results-channel)]
(let [new-results (assoc results step-id step-result)
old-unified (unify-results-fn results)
new-unified (unify-results-fn new-results)]
(if (not= old-unified new-unified)
(async/>! out-ch new-unified))
(recur new-results))
(async/close! out-ch)))))
(let [dropping-output-ch (async/chan (async/sliding-buffer 1))]
(async/pipe dropping-output-ch out-ch)
(loop [results {}]
(if-let [{step-id :step-id
step-result :step-result} (async/<! step-results-channel)]
(let [new-results (assoc results step-id step-result)
old-unified (unify-results-fn results)
new-unified (unify-results-fn new-results)]
(if (not= old-unified new-unified)
(async/>! dropping-output-ch new-unified))
(recur new-results))
(async/close! dropping-output-ch))))))

(defn contexts-for-steps
"creates contexts for steps"
Expand Down
27 changes: 25 additions & 2 deletions test/clj/lambdacd/steps/support_test.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
(ns lambdacd.steps.support-test
(:require [clojure.test :refer :all]
[lambdacd.steps.support :as step-support :refer :all]
[lambdacd.testsupport.test-util :refer [slurp-chan]]
[lambdacd.testsupport.test-util :refer [slurp-chan call-with-timeout]]
[lambdacd.testsupport.data :refer [some-ctx some-ctx-with]]
[lambdacd.testsupport.matchers :refer [map-containing]]
[clojure.core.async :as async]))
[clojure.core.async :as async]
[lambdacd.testsupport.noop-pipeline-state :as noop-pipeline-state]
[lambdacd.execution :as execution]))

(defn some-step [args ctx]
{:status :success :foo :bar})
Expand Down Expand Up @@ -244,6 +246,27 @@
`(1 42) {:status :failure}}
:foo :bar})))))

(defn output-load-test-ctx []
(some-ctx-with :pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state)))

(defn log-lots-of-output [args ctx]
(doall (for [i (range 800)]
(if-not-killed ctx
(async/>!! (:result-channel ctx) [:xyz i]))))
{:status :success})

(defn log-lots-of-output-in-chaining [args ctx]
(chaining args ctx
(log-lots-of-output injected-args injected-ctx)))

(deftest output-stress-test ; reproduces #135
(testing "that we don't fail if we come across lots of output for just in general"
(is (= :success (:status (call-with-timeout 10000
(execution/execute-step {} [(output-load-test-ctx) log-lots-of-output]))))))
(testing "that we don't fail if we come across lots of output for chaining"
(is (= :success (:status (call-with-timeout 10000
(execution/execute-step {} [(output-load-test-ctx) log-lots-of-output-in-chaining])))))))

(deftest if-not-killed-test
(testing "that the body will only be executed if step is still alive"
(let [killed-ctx (some-ctx-with :is-killed (atom true))
Expand Down

0 comments on commit 5fa1273

Please sign in to comment.