-
Notifications
You must be signed in to change notification settings - Fork 0
/
buffer.go
97 lines (91 loc) · 1.84 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package span
import (
"io"
"runtime"
"sync"
)
var ErrClosed = io.ErrClosedPipe
type Stream struct {
mutex sync.Mutex
puzzle []span[int]
buff []byte
off int
closed bool
}
func (stream *Stream) WriteAt(data []byte, off int) (int, error) {
stream.mutex.Lock()
defer stream.mutex.Unlock()
if stream.closed {
return 0, ErrClosed
}
if len(data) == 0 {
return 0, nil
}
if off < stream.off {
return len(data), nil
}
if required := len(data) + off - len(stream.buff); required > 0 {
stream.buff = append(stream.buff, make([]byte, required)...)
}
stream.puzzle = span[int]{off, off + len(data)}.AddTo(stream.puzzle)
return copy(stream.buff[off-stream.off:], data), nil
}
func (stream *Stream) Len() int {
stream.mutex.Lock()
defer stream.mutex.Unlock()
if stream.closed {
return -1
}
for _, s := range stream.puzzle {
if s.Contains(stream.off) {
return s.End - stream.off
}
}
return 0
}
func (stream *Stream) Close() error {
stream.mutex.Lock()
defer stream.mutex.Unlock()
if stream.closed {
return ErrClosed
}
stream.closed = true
stream.buff = nil
stream.puzzle = nil
return nil
}
func (stream *Stream) Missing() int {
stream.mutex.Lock()
defer stream.mutex.Unlock()
if stream.closed {
return -1
}
if len(stream.puzzle) == 0 {
return 0
}
if stream.puzzle[0].Start != 0 {
return 0
}
return stream.puzzle[0].End
}
func (stream *Stream) Read(dst []byte) (int, error) {
for {
stream.mutex.Lock()
for _, s := range stream.puzzle {
if s.Contains(stream.off) {
n := copy(dst, stream.buff[:s.End-stream.off])
stream.off += n
stream.buff = stream.buff[n:]
// stream.puzzle = span[int]{0, stream.off}.SubFrom(stream.puzzle)
stream.mutex.Unlock()
return n, nil
}
}
if stream.closed {
stream.mutex.Unlock()
return 0, io.EOF
}
stream.mutex.Unlock()
runtime.Gosched()
}
}