Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add draft of throttled copy #258

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

bearrito
Copy link
Contributor

@bearrito bearrito commented Mar 3, 2024

Adds ability to do throttled copy operations related to filling the cache.
Problem
Basically I'm trying to replicate what curl offers - https://everything.curl.dev/usingcurl/transfers/rate-limiting

We had discussed before doing this store/server side. I'm not sure that works for our case.

  • We have many devices 500-1500 per facility (warehouse)

  • Each device is generally doing work, so we need to dribble in updates in the background

  • Devices are wifi aware so if they are in a better part of the facility they can stream faster.

  • The store wouldn't have knowledge of IP to device. They store wouldn't know if the device is busy or in a good wifi area.

Approach
I think I took a pretty standard approach for rate-limiting workers. I do think it would have been better if I was able to get the size of the chunks. Right now waiting a fixed period essentially just limits the number of chunks per second, which will get your roughly (Average Chunk Size ) / second of bytes. At least with this way there is no coordination between workers in summing up bytes.

@folbricht
Copy link
Owner

Having this feature is a good idea, it could be quite useful. But I think this could be implemented a bit more generically.

How about:

  • Making the Limiter into a wrapper for a "Store" and "WriteStore". Then you wouldn't need to make any changes to the Copy function at all, the store passed into it would be rate-limited
  • In the Limiter, use https://pkg.go.dev/golang.org/x/time/rate to do the actual limiting rather than a custom sleep. This would also allow you to limit based on the number of bytes in the chunk. Probably have to make sure that the burst size is equal/bigger than than max-chunk-size
  • Could add a separate commandline flag for write-limit and read-limit in bytes per second

It'd be nice if we could make that more generic than just the copy function and expand to all, though it might be hard to specify this on the commandline since there could be several stores involved in a single command. Perhaps we could just expect this to be defined in store config file (https://github.com/folbricht/desync?tab=readme-ov-file#configuration).

Thoughts?

@bearrito
Copy link
Contributor Author

bearrito commented Mar 4, 2024

No problem. I'll obviously defer to your expertise in the matter.
I'll have to look at the code-base more, but I guess I need to figure out how to provide backpressure from the write store.

Edit: Are you good with me limiting this to the LocalStore first?

@folbricht
Copy link
Owner

You don't need to change the LocalStore at all. You'll just make a wrapper for any kind of store. The steps would be roughly:

  • Make a new struct like RateLimitStore or similar (in a new file)
  • The struct should have at least these methods https://github.com/folbricht/desync/blob/master/store.go#L16-L20 to implement the Store interface. For a good example take a look at swapstore.go, it's very close to what you need. Of course yours wouldn't be about swapping the underlying store, but limiting the GetChunk() method. Initially you could just pass the methods through to the wrapped store.
  • Your struct would have a constructor like this func NewRateLimitStore(s Store) *RateLimitStore {
  • Take a look at how the source-store is intitialized in https://github.com/folbricht/desync/blob/master/cmd/desync/cache.go#L103-L107, after that line you'd do something like s = NewRateLimitStore(s) which would wrap whatever the read store is. We can later take a look at how to generalize this more if needed, like if we want to be able to limit the writing part as well.

Hope that helps, let me know if not and I can whip up a template on a branch which you can then complete.

ctx, cancel:= context.WithTimeout(context.Background(), s.options.timeout)
defer cancel()

if s.options.immediateOrFail{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you actually need immediateOrFail ? Seems unexpected to cause a failure because the rate-limit was exceeded.

Copy link
Contributor Author

@bearrito bearrito Mar 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could go either way. I guess I'd lean towards removal.

Related:
I'm not sure what the overall failure handling should be, I guess a client normally wants to wait, but I don't like naked waits with no timeout ability. Perhaps timeouts should be default large.

We could have two paths, one where we use a context with timeout and one without. But that's another code path.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd certainly be nice to have a context on all these functions but that's a major refactoring of the library that should be done in a separate PR.

The wait really depends on the rate-limit here, and I don't think it should hang forever. Adding a timeout seems overly complex and I'd remove that. It might be possible to calculate some upper limit based on the rate-limit used and the max chunk size perhaps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm my understanding:

  1. Remove the timeout from the context.
  2. Allow the ops to wait indefinitely, with no timeout.

I'm ultimately only concerned with throttling cache operations against my local store, so I think that's fine for me. However, I haven't looked at the serving code much, is there a chance that this affects HTTP handling?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, no custom timeout. This can be implemented some day when contexts are added to all store operations.

This shouldn't affect HTTP stores at all, since we're not really limiting the data-rate, but the speed at which whole chunks can be read/stored. So if the rate-limiter allows one chunk to be accessed, that will be transferred at max speed which means the HTTP timeouts apply like normal. Chunk downloading is limited to the rate-limit on average, not for individual chunks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test with the HttpHandler. There is a way to reproduce an error.

goroutine 125 [select]:
golang.org/x/time/rate.(*Limiter).wait(0xc0001df360, {0xe27338, 0x13ceb80}, 0x1, {0x0?, 0x0?, 0x139e0c0?}, 0xd56f30)
        /home/bstrausser/go/pkg/mod/golang.org/x/[email protected]/rate/rate.go:285 +0x3e5
golang.org/x/time/rate.(*Limiter).WaitN(0x79e845e27a40b76?, {0xe27338, 0x13ceb80}, 0x269063b6f56504d5?)
        /home/bstrausser/go/pkg/mod/golang.org/x/[email protected]/rate/rate.go:248 +0x50
github.com/folbricht/desync.RateLimitedStore.HasChunk({{0xe27dc0, 0xc0003f2b00}, 0xc0001df360, {0x3fc999999999999a, 0x1}}, {0x76, 0xb, 0xa4, 0x27, 0x5e, ...})
        /home/bstrausser/Git/desync/ratelimitstore.go:59 +0x7e
github.com/folbricht/desync.HTTPHandler.head({{{0xceccfb, 0x5}, 0x1, {0x0, 0x0}}, {0xe273e0, 0xc00040cd50}, 0x0, {0xc0003f0990, 0x1, ...}, ...}, ...)
        /home/bstrausser/Git/desync/httphandler.go:77 +0x4e
github.com/folbricht/desync.HTTPHandler.ServeHTTP({{{0xceccfb, 0x5}, 0x1, {0x0, 0x0}}, {0xe273e0, 0xc00040cd50}, 0x0, {0xc0003f0990, 0x1, ...}, ...}, ...)
        /home/bstrausser/Git/desync/httphandler.go:47 +0x266
net/http.serverHandler.ServeHTTP({0xc001bda720?}, {0xe25808?, 0xc00061c2a0?}, 0x6?)
        /usr/local/go/src/net/http/server.go:2938 +0x8e
net/http.(*conn).serve(0xc0006a0c60, {0xe275d8, 0xc00040d1a0})
        /usr/local/go/src/net/http/server.go:2009 +0x5f4
created by net/http.(*Server).Serve in goroutine 9
        /usr/local/go/src/net/http/server.go:3086 +0x5cb

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just part of the stacktrace. Is there a repro on your branch? What test can I run to see it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I went back to look I couldn't repro.


func (s RateLimitedLocalStore) GetChunk(id ChunkID) (*Chunk, error) {

return s.wrappedStore.GetChunk(id)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you wanted to limit reads as well (and I think we should), it would make sense to first call GetChunk() on the underlying store, then wait after we have the data. That isn't super-precise in terms of limiting since there could be bursts of maxChunkSize * n at startup but over time it should even out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I'll invert the order and add in support for reads.

immediateOrFail bool
}

type RateLimitedLocalStore struct {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the Local in the name. This could limit any kind of store.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I'll fix. I think I was originally wrapping a local store only.

@bearrito
Copy link
Contributor Author

Add throttle on reads.


func (s RateLimitedStore) StoreChunk(chunk *Chunk) error {

// This isn't ideal because what I'm really interested is in size over the wire.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can actually achieve this by using a limit of "bytes per second" (must be equal or larger than max-chunk-size). The here in StoreChunk() you can

	b, err := chunk.Data()
	if err != nil {
		return err
	}
	err = s.limiter.WaitN(ctx, len(b))
	if err != nil {
		return RateLimitedExceeded
	}
        return s.wrappedStore.StoreChunk(chunk)

It's a little more interesting in GetChunk() since you don't know how large it is. One way would be to WaitN(ctx, <avg-chunk-size>) before calling the wrapped store, or WaitN(ctx, <size of data>) after actually pulling the chunk from the wrapped store. The latter can cause spikes early on when concurrency is used, but should average to the rate you set over time.

As for HasChunk(), not sure if you really need to limit it, it's very small. You can just use 1 like you have now if you want to limit it too, could do that before calling the wrappedStore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants