Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add streaming uploads. #8235

Closed
wants to merge 1 commit into from
Closed

Conversation

simi
Copy link

@simi simi commented Dec 6, 2020

Hello.

This is my initial take on proposed new method to be able to upload IO in chunks. It works for any IO-like object supporting eof?, pos and read(bytes) methods.

I'm opening this PR to find out interest to making this part of official SDK. There is still some work left. I left some TODOs as well. Tests and docs are missing for now, but I'm ready to spend more time on this.

Simple example:

# setup IO object
# can be also File or StringIO
file_io = File.open('my_file.csv')
string_io = StringIO.new('ABC' * 1_000_000)

# init storage and bucket
storage = Google::Cloud::Storage.new(project_id: "abc", credentials: "../gcs-json-key.json")
bucket = storage.bucket 'abc'

# setup file object, if needed new empty file can be created as well
# via bucket.new_file(StringIO.new(''), 'my_file', acl: 'public_read')
file = bucket.file('my_file', skip_lookup: true)

# get signed url
signed_url = file.signed_url(method: :POST, headers: {'x-goog-resumable' => 'start'}, version: :v4)

# use signed url to upload io
bucket.upload_io(file_io, signed_url)

# it is possible to also specify content type and chunk size
bucket.upload_io(string_io, signed_url, chunk_size: 1024 * 1024, content_type: 'text/csv')

file.reload! # get new metadata including md5 and crc32c hash to verify

It is also possible to wrap IO object to build more complex stuff like progress bar and calculate md5 and crc32c on the fly to compare.

require 'digest/crc32c'
require "google/cloud/storage"
require 'forwardable'

class HashedIO
  extend Forwardable
  def_delegators :@io, :eof?, :pos

  def initialize(io)
    @io = io
    @md5 = Digest::MD5.new
    @crc32 = Digest::CRC32c.new
  end

  def read(chunk_size)
    @io.read(chunk_size).tap do |chunk|
      @md5.update(chunk)
      @crc32.update(chunk)
      print "." # simple progress bar in CLI
    end
  end

  def md5
    eof? ? @md5 : raise('Attempted determining MD5 before eof')
  end

  def crc32
    eof? ? @crc32 : raise('Attempted determining CRC32 before eof')
  end
end

storage = Google::Cloud::Storage.new(
  project_id: "rasa-test-1",
  credentials: "../gcs-json-key.json",
)

bucket = storage.bucket 'rasa-as-test'

file_io = File.open('/home/retro/Stažené/COSReport082020-Fuga-Revised_1_ (1).xlsx')
io = HashedIO.new(file_io)

file = bucket.file('my_file', skip_lookup: true)
signed_url = file.signed_url(method: :POST, headers: {'x-goog-resumable' => 'start'}, version: :v4)
print "Uploading"
bucket.upload_io(io, signed_url, chunk_size: 1024 * 1024, content_type: 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')

uploaded_file = file.reload!

print "\n"
print "#{uploaded_file.name} was uploaded to #{uploaded_file.bucket} bucket.\n"

print "\nMD5:\n"
print "  uploaded:   #{uploaded_file.md5}\n"
print "  calculated: #{io.md5.base64digest}\n"

if uploaded_file.md5 == io.md5.base64digest
  print "MD5 OK\n"
else
  print "MD5 check failed\n"
end

print "\nCRC32:\n"
print "  uploaded:   #{uploaded_file.crc32c}\n"
print "  calculated: #{io.crc32.base64digest}\n"

if uploaded_file.crc32c == io.crc32.base64digest
  print "CRC32 OK\n"
else
  print "CRC32 check failed\n"
end

print "\n"

asciicast

@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Dec 6, 2020
@dazuma dazuma requested a review from quartzmo December 7, 2020 03:23
@quartzmo quartzmo self-assigned this Dec 7, 2020
@quartzmo quartzmo added api: storage Issues related to the Cloud Storage API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. labels Dec 7, 2020
@quartzmo
Copy link
Member

quartzmo commented Dec 7, 2020

@simi Thank you so much for this suggestion!

I really like how you use #signed_url to get around the issue in google-api-ruby-client that has blocked us from adding an authenticated streaming upload. However, I'm not sure this method belongs in this library. The core responsibility of the Bucket class and google-cloud-storage in general is to provide authenticated access to the service API. If I'm not mistaken, the code inside Bucket#upload_io works equally well outside of the authenticated context, and depends on Faraday, but otherwise not at all on google-cloud-storage. The #signed_url methods are somewhat similar in that they do not perform RPCs, but they do obtain the issuer and signing_key from the authentication credentials.

I see two alternatives for adding your code to the library:

  • Add it as a usage example in OVERVIEW.md
  • Add it as a utility method that does not require authentication. For example, in : Google::Cloud::Storage::File::Utils.chunked_upload

For the latter, I'd like to get input from some other stakeholders. @dazuma, @frankyn: Any thoughts on this?

@simi
Copy link
Author

simi commented Dec 8, 2020

I really like how you use #signed_url to get around the issue in google-api-ruby-client that has blocked us from adding an authenticated streaming upload.

@quartzmo Yes, I have seen following in ruby api client README.md

This means that we will address critical bugs and security issues but will not add any new features.

and decided to not contribute in there since it is not easy to change code in three. If I understand it well, that code is somehow generated and it is probably not easy to contribute changes anyway.

If I'm not mistaken, the code inside Bucket#upload_io works equally well outside of the authenticated context, and depends on Faraday, but otherwise not at all on google-cloud-storage.

My code works standalone, that's true. Looking at aws/aws-sdk-ruby#1711 it should be possible to actually create upload stream class to be able to use IO.copy_stream instead of this more custom approach and move it out of Google::Cloud::Storage::Bucket at all. That makes sense to me. WDYT?

# using API
storage = Google::Cloud::Storage.new
bucket = storage.bucket 'rasa-as-test'
file = bucket.file('my_file', skip_lookup: true)
signed_url = file.signed_url(method: :POST, headers: {'x-goog-resumable' => 'start'}, version: :v4)

read_stream = StringIO.new('ABC')

# using utility class, just bring your own signed url able to do resumable upload
uploader = Google::Cloud::Storage::ChunkedUploadStream.new(signed_url, content_type: nil, chunk_size: 1024 * 1024)

uploader.upload do |write_stream|
  IO.copy_stream(read_stream, write_stream)
end

...works equally well outside of the authenticated context, and depends on Faraday...

Regarding Faraday, it is already in-direct dependency of google-cloud-storage (-> google-cloud-core -> google-cloud-env -> faraday and -> google-api-client -> googleauth -> faraday). I used it just because of this. Any http lib can be used to do the same, even native Ruby one.

Add it as a usage example in OVERVIEW.md

I think any of those solutions should be part of this SDK since I would like to contribute this possibility more upstream (for example to Rails' ActiveJob) to make it possible to be used easily by users of those libraries and vendoring this kind of code is not usually welcomed (same as using additional 3rd party gem to handle stuff not supported by official SDK). Contributing general utility class like mentioned ChunkedUploadStream sounds like good compromise.

@quartzmo
Copy link
Member

quartzmo commented Dec 9, 2020

@simi Personally I like your suggestion a lot. I'll try to get some other maintainers to take a look, so please hold on for now.

@dazuma
Copy link
Member

dazuma commented Dec 10, 2020

I like the idea of a separate utility class to handle the upload. As long as it is separable from the storage classes, so it's easy to lift out and contribute upstream if necessary.

@quartzmo quartzmo requested a review from dazuma December 10, 2020 15:30
@frankyn
Copy link
Member

frankyn commented Dec 11, 2020

Synced with @quartzmo about this FR.

He's going to work it into google-api-client resumable upload logic. We found that it is possible but requires introducing chunking into the upload to handle streams.

We should not merge this PR and instead use it as an example. Thanks @simi for proposing this PR!

@quartzmo
Copy link
Member

@simi I'm going to close this PR. Thank you so much for prompting a new effort to support streaming uploads in this client stack!

@quartzmo quartzmo closed this Dec 11, 2020
@simi
Copy link
Author

simi commented Jan 13, 2021

Hello, any news on this? I can't find anything related in google-api-client part.

@simi
Copy link
Author

simi commented Apr 13, 2021

@quartzmo pingie.

@quartzmo
Copy link
Member

As commented on googleapis/google-api-ruby-client#759, this is still blocked as far as I know by googleapis/google-api-ruby-client#2348.

@quartzmo
Copy link
Member

The code in this PR should still be regarded as a proposed usage sample for the google-cloud-storage library, but not added as a library feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: storage Issues related to the Cloud Storage API. cla: yes This human has signed the Contributor License Agreement. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants