-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(s3): reducing array copying on S3 output stream #549
Conversation
Durations (if empty or more than 1) are not included in the log message.
d7f6530
to
54ae8de
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
However, in Java, the close method of ByteArrayInputStream has no effect. The methods of this class can be called after the stream has been closed without generating an IOException. This is because the data of ByteArrayInputStream is stored in memory, unlike file streams or network streams that require actual resource cleanup, so there may not be an out-of-memory problem |
@funky-eyes good catch! I'm adding more changes on how bytes are to the request using byte buffers only instead of array copying. PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// TODO: get rid of this array copying | ||
partBuffer.put(source.array(), offset, transferred); | ||
processedBytes += transferred; | ||
source.position(source.position() + transferred); | ||
final ByteBuffer inputBuffer = ByteBuffer.wrap(b, off, len); | ||
while (inputBuffer.hasRemaining()) { | ||
// copy batch to part buffer | ||
final int inputLimit = inputBuffer.limit(); | ||
final int toCopy = Math.min(partBuffer.remaining(), inputBuffer.remaining()); | ||
final int positionAfterCopying = inputBuffer.position() + toCopy; | ||
inputBuffer.limit(positionAfterCopying); | ||
partBuffer.put(inputBuffer.slice()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main change: remove array copying, and reuse input buffer
final ByteArrayInputStream in = new ByteArrayInputStream(partBuffer.array(), offset, actualPartSize); | ||
uploadPart(in, actualPartSize); | ||
partBuffer.clear(); | ||
try (final InputStream in = new ByteBufferInputStream(buffer)) { | ||
processedBytes += actualPartSize; | ||
uploadPart(in, actualPartSize); | ||
} catch (final IOException e) { | ||
throw new RuntimeException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, provide an inputstream directly from buffer stream instead of creating a new byte array.
For reference, similar improvements have been implemented on kafka core: apache/kafka#15589 |
@@ -50,8 +50,8 @@ public class S3StorageTest extends BaseStorageTest { | |||
|
|||
@BeforeAll | |||
static void setUpClass() { | |||
final var clientBuilder = S3Client.builder(); | |||
clientBuilder.region(Region.of(LOCALSTACK.getRegion())) | |||
s3Client = S3Client.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it probably doesn't matter for a test, but i think that s3Client
needs to be closed in a @AfterAll
method to cleanup eventual connections/resources.
try (final var out = s3OutputStream(key)) { | ||
final var out = s3OutputStream(key); | ||
try (out) { | ||
inputStream.transferTo(out); | ||
return out.processedBytes(); | ||
} catch (final IOException e) { | ||
throw new StorageBackendException("Failed to upload " + key, e); | ||
} | ||
return out.processedBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i do not understand why you did this 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment to clarify a bit, but the main reason this is now outside the resource try-catch is because the processedBytes
in the output stream is counted now as part of the flushBuffer
and not the write
method anymore.
This is more accurate as it counts bytes processed after upload compared to the previous behavior where it was count when writing.
Before, the processed bytes would be known before closing. Now as the last flushBuffer
may occur when closing the stream, it means that the result is not known before closing, thus the change.
writeAllMessages is not validating the request bodies properly, as writesTailMessages does.
Define 2 buffers: 1 for input read data, and the other for part size upload. Slice them to pass data to IO (part building and part upload).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good 👍
Use only ByteRange slices to pass bytes to S3 client operations and remove array copying.
Doing some benchmarks on the current implementation, the Arrays.copyOfRange dominates the memory allocation:
By switching to the ByteBuffer approach, this copying is removed: