Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix pool put #1657

Draft
wants to merge 3 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 18 additions & 22 deletions wasmsdk/jsbridge/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package jsbridge

import (
"bytes"
"errors"
"io"
"io/fs"
"syscall/js"
)
Expand All @@ -15,41 +15,37 @@ type FileWriter struct {
uint8Array js.Value
fileHandle js.Value
bufLen int
buf []byte
bufWriteOffset int
buf *bytes.Buffer
writeError bool
}

const writeBlocks = 10
const (
writeBlocks = 25
writeLen = 64 * 1024 * 20
)

// len(p) will always be <= 64KB
func (w *FileWriter) Write(p []byte) (int, error) {
//init buffer if not initialized
if len(w.buf) == 0 {
w.buf = make([]byte, len(p)*writeBlocks)
if w.buf == nil {
w.buf = bytes.NewBuffer(make([]byte, 0, len(p)*writeBlocks))
}

//copy bytes to buf
if w.bufWriteOffset+len(p) > len(w.buf) {
w.writeError = true
return 0, io.ErrShortWrite
}
n := copy(w.buf[w.bufWriteOffset:], p)
w.bufWriteOffset += n
if w.bufWriteOffset == len(w.buf) {
w.buf.Write(p)
if w.buf.Len() > writeLen {
//write to file
if w.bufLen != len(w.buf) {
w.bufLen = len(w.buf)
if w.bufLen != w.buf.Len() {
w.bufLen = w.buf.Len()
w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen)
}
js.CopyBytesToJS(w.uint8Array, w.buf)
js.CopyBytesToJS(w.uint8Array, w.buf.Bytes())
_, err := Await(w.writableStream.Call("write", w.uint8Array))
if len(err) > 0 && !err[0].IsNull() {
w.writeError = true
return 0, errors.New("file_writer: " + err[0].String())
}
//reset buffer
w.bufWriteOffset = 0
w.buf.Reset()
}
return len(p), nil
}
Expand All @@ -71,14 +67,14 @@ func (w *FileWriter) Write(p []byte) (int, error) {

func (w *FileWriter) Close() error {

if w.bufWriteOffset > 0 && !w.writeError {
w.buf = w.buf[:w.bufWriteOffset]
uint8Array := js.Global().Get("Uint8Array").New(len(w.buf))
js.CopyBytesToJS(uint8Array, w.buf)
if w.buf.Len() > 0 && !w.writeError {
uint8Array := js.Global().Get("Uint8Array").New(w.buf.Len())
js.CopyBytesToJS(uint8Array, w.buf.Bytes())
_, err := Await(w.writableStream.Call("write", uint8Array))
if len(err) > 0 && !err[0].IsNull() {
return errors.New("file_writer: " + err[0].String())
}
w.buf.Reset()
}

_, err := Await(w.writableStream.Call("close"))
Expand Down
1 change: 1 addition & 0 deletions zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func (su *ChunkedUpload) process() error {
su.statusCallback.Started(su.allocationObj.ID, su.fileMeta.RemotePath, su.opCode, int(su.fileMeta.ActualSize)+int(su.fileMeta.ActualThumbnailSize))
}
su.startProcessor()
defer su.chunkReader.Release()
defer su.chunkReader.Close()
defer su.ctxCncl(nil)
for {
Expand Down
7 changes: 6 additions & 1 deletion zboxcore/sdk/chunked_upload_chunk_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type ChunkedUploadChunkReader interface {
GetFileHash() (string, error)
//Reset reset offset
Reset()
//Release release resources
Release()
}

// chunkedUploadChunkReader read chunk bytes from io.Reader. see detail on https://github.com/0chain/blobber/wiki/Protocols#what-is-fixedmerkletree
Expand Down Expand Up @@ -315,7 +317,6 @@ func (r *chunkedUploadChunkReader) Close() {
r.closeOnce.Do(func() {
close(r.hasherDataChan)
r.hasherWG.Wait()
uploadPool.Put(r.fileShardsDataBuffer)
})

}
Expand All @@ -328,6 +329,10 @@ func (r *chunkedUploadChunkReader) GetFileHash() (string, error) {
return r.hasher.GetFileHash()
}

func (r *chunkedUploadChunkReader) Release() {
uploadPool.Put(r.fileShardsDataBuffer)
}

func (r *chunkedUploadChunkReader) hashData() {
defer r.hasherWG.Done()
for data := range r.hasherDataChan {
Expand Down
2 changes: 2 additions & 0 deletions zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1344,12 +1344,14 @@ func writeData(dest io.Writer, data [][][]byte, dataShards, remaining int) (int,
n, err := dest.Write(data[i][j])
total += n
if err != nil {
logger.Logger.Error("write failed: ", err, " total: ", total, " remaining: ", remaining, " toWriteData: ", len(data[i][j]))
return total, err
}
} else {
n, err := dest.Write(data[i][j][:remaining])
total += n
if err != nil {
logger.Logger.Error("write failed: ", err, " total: ", total, " remaining: ", remaining)
return total, err
}
}
Expand Down
19 changes: 11 additions & 8 deletions zboxcore/sdk/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/0chain/gosdk/zboxcore/marker"
"github.com/0chain/gosdk/zboxcore/zboxutil"
"github.com/minio/sha256-simd"
"go.uber.org/zap"
)

type LatestPrevWriteMarker struct {
Expand Down Expand Up @@ -274,7 +273,7 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, []BlobberStatus, error) {
if err != nil {
atomic.AddInt32(&errCnt, 1)
markerError = err
l.Logger.Error("error during getWritemarker", zap.Error(err))
l.Logger.Error("error during getWritemarker: ", err)
blobStatus.Status = "unavailable"
}
if wr == nil {
Expand Down Expand Up @@ -326,7 +325,11 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, []BlobberStatus, error) {
if _, ok := versionMap[version]; !ok {
versionMap[version] = make([]*RollbackBlobber, 0)
}

if rb.lpm.PrevWM != nil {
l.Logger.Info("version ", version, " blobber ", rb.blobber.Baseurl, " timestamp ", rb.lpm.LatestWM.Timestamp, " prev ", rb.lpm.PrevWM.FileMetaRoot)
} else {
l.Logger.Info("version ", version, " blobber ", rb.blobber.Baseurl, " timestamp ", rb.lpm.LatestWM.Timestamp)
}
versionMap[version] = append(versionMap[version], rb)
}

Expand All @@ -346,14 +349,14 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, []BlobberStatus, error) {
}
return Repair, blobberRes, nil
} else {
l.Logger.Info("versionMapLen", zap.Int("versionMapLen", len(versionMap)), zap.Int("latestLen", len(versionMap[latestVersion])), zap.Int("prevLen", len(versionMap[prevVersion])))
l.Logger.Info("versionMapLen", len(versionMap), " latestLen: ", len(versionMap[latestVersion]), " prevLen: ", len(versionMap[prevVersion]))
}

// rollback to previous version
l.Logger.Info("Rolling back to previous version")
fullConsensus := len(versionMap[latestVersion]) - (req - len(versionMap[prevVersion]))
errCnt = 0
l.Logger.Info("fullConsensus", zap.Int32("fullConsensus", int32(fullConsensus)), zap.Int("latestLen", len(versionMap[latestVersion])), zap.Int("prevLen", len(versionMap[prevVersion])))
l.Logger.Info("fullConsensus ", int32(fullConsensus), " latestLen ", len(versionMap[latestVersion]), " prevLen ", len(versionMap[prevVersion]))
for _, rb := range versionMap[latestVersion] {

wg.Add(1)
Expand All @@ -363,7 +366,7 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, []BlobberStatus, error) {
if err != nil {
atomic.AddInt32(&errCnt, 1)
rb.commitResult = ErrorCommitResult(err.Error())
l.Logger.Error("error during rollback", zap.Error(err))
l.Logger.Error("error during rollback ", err)
} else {
rb.commitResult = SuccessCommitResult()
}
Expand Down Expand Up @@ -399,7 +402,7 @@ func (a *Allocation) RollbackWithMask(mask zboxutil.Uint128) {
defer wg.Done()
wr, err := GetWritemarker(a.ID, a.Tx, a.sig, blobber.ID, blobber.Baseurl)
if err != nil {
l.Logger.Error("error during getWritemarker", zap.Error(err))
l.Logger.Error("error during getWritemarker: ", err)
}
if wr == nil {
markerChan <- nil
Expand All @@ -426,7 +429,7 @@ func (a *Allocation) RollbackWithMask(mask zboxutil.Uint128) {
err := rb.processRollback(context.TODO(), a.Tx)
if err != nil {
rb.commitResult = ErrorCommitResult(err.Error())
l.Logger.Error("error during rollback", zap.Error(err))
l.Logger.Error("error during rollback: ", err)
} else {
rb.commitResult = SuccessCommitResult()
}
Expand Down
Loading