From f517e617e86345af7d7179eee44482a65e50135f Mon Sep 17 00:00:00 2001 From: Moshe Good Date: Fri, 16 Feb 2024 15:04:05 -0500 Subject: [PATCH] Option to delay fetching bulk data between batches of new clients --- docs/configuration.md | 2 +- .../streams/stream_provider_server_side.go | 29 ++++++++++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index ca4a4437..0e22b03c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -65,7 +65,7 @@ For **Duration** settings, the value should be be an integer followed by `ms`, ` | `logLevel` | `LOG_LEVEL` | String | `info` | Should be `debug`, `info`, `warn`, `error`, or `none`. To learn more, read [Logging](./logging.md). | | `bigSegmentsStaleAsDegraded` | `BIG_SEGMENTS_STALE_AS_DEGRADED` | Boolean | `false` | Indicates if environments should be considered degraded if Big Segments are not fully synchronized. | | `bigSegmentsStaleThreshold` | `BIG_SEGMENTS_STALE_THRESHOLD` | Duration | `5m` | Indicates how long until Big Segments should be considered stale. | -| n/a | `STREAMING_MIN_DELAY` | Duration | `0` | The minimum latency of responding to a new client connection. Used only in proxy mode for streaming clients. Useful for reducing memory when under heavy load, as many clients can share a single data fetch. | +| n/a | `BATCH_FETCH_PERIOD` | Duration | `0` | The minimum latency between bulk fetching all data for a batch of new clients. Used only in proxy mode for streaming clients. Useful for reducing memory when under heavy load, as many clients can share a single data fetch. | _(1)_ The default values for `streamUri`, `baseUri`, and `clientSideBaseUri` are `https://stream.launchdarkly.com`, `https://sdk.launchdarkly.com`, and `https://clientsdk.launchdarkly.com`, respectively. You should never need to change these URIs unless you are either using a special instance of the LaunchDarkly service, in which case Support will tell you how to set them, or you are accessing LaunchDarkly using a reverse proxy or some other mechanism that rewrites URLs. diff --git a/internal/streams/stream_provider_server_side.go b/internal/streams/stream_provider_server_side.go index 70935b1e..a6f58890 100644 --- a/internal/streams/stream_provider_server_side.go +++ b/internal/streams/stream_provider_server_side.go @@ -33,7 +33,8 @@ type serverSideEnvStreamRepository struct { store EnvStoreQueries loggers ldlog.Loggers - flightGroup singleflight.Group + flightGroup singleflight.Group + previousFlight time.Time } func (s *serverSideStreamProvider) Handler(credential sdkauth.ScopedCredential) http.HandlerFunc { @@ -111,7 +112,20 @@ func (r *serverSideEnvStreamRepository) Replay(channel, id string) chan eventsou // getReplayEvent will return a ServerSidePutEvent with all the data needed for a Replay. func (r *serverSideEnvStreamRepository) getReplayEvent() (eventsource.Event, error) { data, err, _ := r.flightGroup.Do("getReplayEvent", func() (interface{}, error) { - start := time.Now() + // We do not want to call this flight group too often, as it can use a lot of RAM. + // This will ensure that we don't call it more than once every BATCH_FETCH_PERIOD. + delayS, has := os.LookupEnv("BATCH_FETCH_PERIOD") + if has { + if delay, err := time.ParseDuration(delayS); err == nil { + if time.Since(r.previousFlight) < delay { + time.Sleep(delay - time.Since(r.previousFlight)) + } + } else { + r.loggers.Warnf("Ignoring invalid BATCH_FETCH_PERIOD: %s\n", delayS) + } + r.previousFlight = time.Now() + } + flags, err := r.store.GetAll(ldstoreimpl.Features()) if err != nil { @@ -131,17 +145,6 @@ func (r *serverSideEnvStreamRepository) getReplayEvent() (eventsource.Event, err // This call uses a lot of system resources (RAM in particular). event := MakeServerSidePutEvent(allData) - // So we sleep for a bit to allow a bunch of concurrent calls to - // all make use of this same flightGroup. - delayS, has := os.LookupEnv("STREAMING_MIN_DELAY") - if has { - if delay, err := time.ParseDuration(delayS); err == nil { - time.Sleep(delay - time.Since(start)) - } else { - r.loggers.Warnf("Ignoring invalid STREAMING_MIN_DELAY: %s\n", delayS) - } - } - return event, nil })