Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: have flightGroups take some time to allow reuse #286

Open
wants to merge 3 commits into
base: v8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ For **Duration** settings, the value should be be an integer followed by `ms`, `
| `logLevel` | `LOG_LEVEL` | String | `info` | Should be `debug`, `info`, `warn`, `error`, or `none`. To learn more, read [Logging](./logging.md). |
| `bigSegmentsStaleAsDegraded` | `BIG_SEGMENTS_STALE_AS_DEGRADED` | Boolean | `false` | Indicates if environments should be considered degraded if Big Segments are not fully synchronized. |
| `bigSegmentsStaleThreshold` | `BIG_SEGMENTS_STALE_THRESHOLD` | Duration | `5m` | Indicates how long until Big Segments should be considered stale. |
| n/a | `BATCH_FETCH_PERIOD` | Duration | `0` | The minimum latency between bulk fetching all data for a batch of new clients. Used only in proxy mode for streaming clients. Useful for reducing memory when under heavy load, as many clients can share a single data fetch. |

_(1)_ The default values for `streamUri`, `baseUri`, and `clientSideBaseUri` are `https://stream.launchdarkly.com`, `https://sdk.launchdarkly.com`, and `https://clientsdk.launchdarkly.com`, respectively. You should never need to change these URIs unless you are either using a special instance of the LaunchDarkly service, in which case Support will tell you how to set them, or you are accessing LaunchDarkly using a reverse proxy or some other mechanism that rewrites URLs.

Expand Down
20 changes: 19 additions & 1 deletion internal/streams/stream_provider_server_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package streams

import (
"net/http"
"os"
"sync"
"time"

"github.com/launchdarkly/ld-relay/v8/internal/sdkauth"

Expand Down Expand Up @@ -31,7 +33,8 @@ type serverSideEnvStreamRepository struct {
store EnvStoreQueries
loggers ldlog.Loggers

flightGroup singleflight.Group
flightGroup singleflight.Group
previousFlight time.Time
}

func (s *serverSideStreamProvider) Handler(credential sdkauth.ScopedCredential) http.HandlerFunc {
Expand Down Expand Up @@ -109,6 +112,20 @@ func (r *serverSideEnvStreamRepository) Replay(channel, id string) chan eventsou
// getReplayEvent will return a ServerSidePutEvent with all the data needed for a Replay.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only called when we have a new or reconnecting client.
So some small delay here should be acceptable.

func (r *serverSideEnvStreamRepository) getReplayEvent() (eventsource.Event, error) {
data, err, _ := r.flightGroup.Do("getReplayEvent", func() (interface{}, error) {
// We do not want to call this flight group too often, as it can use a lot of RAM.
// This will ensure that we don't call it more than once every BATCH_FETCH_PERIOD.
delayS, has := os.LookupEnv("BATCH_FETCH_PERIOD")
if has {
if delay, err := time.ParseDuration(delayS); err == nil {
if time.Since(r.previousFlight) < delay {
time.Sleep(delay - time.Since(r.previousFlight))
}
} else {
r.loggers.Warnf("Ignoring invalid BATCH_FETCH_PERIOD: %s\n", delayS)
}
r.previousFlight = time.Now()
}

flags, err := r.store.GetAll(ldstoreimpl.Features())

if err != nil {
Expand All @@ -126,6 +143,7 @@ func (r *serverSideEnvStreamRepository) getReplayEvent() (eventsource.Event, err
{Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)},
}

// This call uses a lot of system resources (RAM in particular).
event := MakeServerSidePutEvent(allData)
return event, nil
})
Expand Down