Skip to content

Commit

Permalink
Merge pull request docker#2656 from tonistiigi/repl-stdin
Browse files Browse the repository at this point in the history
build: allow builds from stdin for multi-node builders
  • Loading branch information
thompson-shaun authored Aug 22, 2024
2 parents b6a2c96 + adbcc22 commit e403ab2
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 14 deletions.
2 changes: 1 addition & 1 deletion build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type CallFunc struct {
type Inputs struct {
ContextPath string
DockerfilePath string
InStream io.Reader
InStream *SyncMultiReader
ContextState *llb.State
DockerfileInline string
NamedContexts map[string]NamedContext
Expand Down
35 changes: 23 additions & 12 deletions build/opt.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package build

import (
"bufio"
"bytes"
"context"
"io"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"syscall"
Expand Down Expand Up @@ -260,7 +261,7 @@ func toSolveOpt(ctx context.Context, node builder.Node, multiDriver bool, opt Op
}

so.Exports = opt.Exports
so.Session = opt.Session
so.Session = slices.Clone(opt.Session)

releaseLoad, err := loadInputs(ctx, nodeDriver, opt.Inputs, pw, &so)
if err != nil {
Expand Down Expand Up @@ -364,7 +365,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog

var (
err error
dockerfileReader io.Reader
dockerfileReader io.ReadCloser
dockerfileDir string
dockerfileName = inp.DockerfilePath
toRemove []string
Expand All @@ -382,23 +383,23 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
return nil, errors.Errorf("invalid argument: can't use stdin for both build context and dockerfile")
}

buf := bufio.NewReader(inp.InStream)
magic, err := buf.Peek(archiveHeaderSize * 2)
rc := inp.InStream.NewReadCloser()
magic, err := inp.InStream.Peek(archiveHeaderSize * 2)
if err != nil && err != io.EOF {
return nil, errors.Wrap(err, "failed to peek context header from STDIN")
}
if !(err == io.EOF && len(magic) == 0) {
if isArchive(magic) {
// stdin is context
up := uploadprovider.New()
target.FrontendAttrs["context"] = up.Add(buf)
target.FrontendAttrs["context"] = up.Add(rc)
target.Session = append(target.Session, up)
} else {
if inp.DockerfilePath != "" {
return nil, errors.Errorf("ambiguous Dockerfile source: both stdin and flag correspond to Dockerfiles")
}
// stdin is dockerfile
dockerfileReader = buf
dockerfileReader = rc
inp.ContextPath, _ = os.MkdirTemp("", "empty-dir")
toRemove = append(toRemove, inp.ContextPath)
if err := setLocalMount("context", inp.ContextPath, target); err != nil {
Expand All @@ -417,7 +418,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
target.SharedKey = sharedKey
switch inp.DockerfilePath {
case "-":
dockerfileReader = inp.InStream
dockerfileReader = inp.InStream.NewReadCloser()
case "":
dockerfileDir = inp.ContextPath
default:
Expand All @@ -426,7 +427,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
}
case IsRemoteURL(inp.ContextPath):
if inp.DockerfilePath == "-" {
dockerfileReader = inp.InStream
dockerfileReader = inp.InStream.NewReadCloser()
} else if filepath.IsAbs(inp.DockerfilePath) {
dockerfileDir = filepath.Dir(inp.DockerfilePath)
dockerfileName = filepath.Base(inp.DockerfilePath)
Expand All @@ -438,11 +439,11 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
}

if inp.DockerfileInline != "" {
dockerfileReader = strings.NewReader(inp.DockerfileInline)
dockerfileReader = io.NopCloser(strings.NewReader(inp.DockerfileInline))
}

if dockerfileReader != nil {
dockerfileDir, err = createTempDockerfile(dockerfileReader)
dockerfileDir, err = createTempDockerfile(dockerfileReader, inp.InStream)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -582,7 +583,7 @@ func setLocalMount(name, dir string, so *client.SolveOpt) error {
return nil
}

func createTempDockerfile(r io.Reader) (string, error) {
func createTempDockerfile(r io.Reader, multiReader *SyncMultiReader) (string, error) {
dir, err := os.MkdirTemp("", "dockerfile")
if err != nil {
return "", err
Expand All @@ -592,6 +593,16 @@ func createTempDockerfile(r io.Reader) (string, error) {
return "", err
}
defer f.Close()

if multiReader != nil {
dt, err := io.ReadAll(r)
if err != nil {
return "", err
}
multiReader.Reset(dt)
r = bytes.NewReader(dt)
}

if _, err := io.Copy(f, r); err != nil {
return "", err
}
Expand Down
164 changes: 164 additions & 0 deletions build/replicatedstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package build

import (
"bufio"
"bytes"
"io"
"sync"
)

type SyncMultiReader struct {
source *bufio.Reader
buffer []byte
static []byte
mu sync.Mutex
cond *sync.Cond
readers []*syncReader
err error
offset int
}

type syncReader struct {
mr *SyncMultiReader
offset int
closed bool
}

func NewSyncMultiReader(source io.Reader) *SyncMultiReader {
mr := &SyncMultiReader{
source: bufio.NewReader(source),
buffer: make([]byte, 0, 32*1024),
}
mr.cond = sync.NewCond(&mr.mu)
return mr
}

func (mr *SyncMultiReader) Peek(n int) ([]byte, error) {
mr.mu.Lock()
defer mr.mu.Unlock()

if mr.static != nil {
return mr.static[min(n, len(mr.static)):], nil
}

return mr.source.Peek(n)
}

func (mr *SyncMultiReader) Reset(dt []byte) {
mr.mu.Lock()
defer mr.mu.Unlock()

mr.static = dt
}

func (mr *SyncMultiReader) NewReadCloser() io.ReadCloser {
mr.mu.Lock()
defer mr.mu.Unlock()

if mr.static != nil {
return io.NopCloser(bytes.NewReader(mr.static))
}

reader := &syncReader{
mr: mr,
}
mr.readers = append(mr.readers, reader)
return reader
}

func (sr *syncReader) Read(p []byte) (int, error) {
sr.mr.mu.Lock()
defer sr.mr.mu.Unlock()

return sr.read(p)
}

func (sr *syncReader) read(p []byte) (int, error) {
end := sr.mr.offset + len(sr.mr.buffer)

loop0:
for {
if sr.closed {
return 0, io.EOF
}

end := sr.mr.offset + len(sr.mr.buffer)

if sr.mr.err != nil && sr.offset == end {
return 0, sr.mr.err
}

start := sr.offset - sr.mr.offset

dt := sr.mr.buffer[start:]

if len(dt) > 0 {
n := copy(p, dt)
sr.offset += n
sr.mr.cond.Broadcast()
return n, nil
}

// check for readers that have not caught up
hasOpen := false
for _, r := range sr.mr.readers {
if !r.closed {
hasOpen = true
} else {
continue
}
if r.offset < end {
sr.mr.cond.Wait()
continue loop0
}
}

if !hasOpen {
return 0, io.EOF
}
break
}

last := sr.mr.offset + len(sr.mr.buffer)
// another reader has already updated the buffer
if last > end || sr.mr.err != nil {
return sr.read(p)
}

sr.mr.offset += len(sr.mr.buffer)

sr.mr.buffer = sr.mr.buffer[:cap(sr.mr.buffer)]
n, err := sr.mr.source.Read(sr.mr.buffer)
if n >= 0 {
sr.mr.buffer = sr.mr.buffer[:n]
} else {
sr.mr.buffer = sr.mr.buffer[:0]
}

sr.mr.cond.Broadcast()

if err != nil {
sr.mr.err = err
return 0, err
}

nn := copy(p, sr.mr.buffer)
sr.offset += nn

return nn, nil
}

func (sr *syncReader) Close() error {
sr.mr.mu.Lock()
defer sr.mr.mu.Unlock()

if sr.closed {
return nil
}

sr.closed = true

sr.mr.cond.Broadcast()

return nil
}
77 changes: 77 additions & 0 deletions build/replicatedstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package build

import (
"bytes"
"crypto/rand"
"io"
mathrand "math/rand"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func generateRandomData(size int) []byte {
data := make([]byte, size)
rand.Read(data)
return data
}
func TestSyncMultiReaderParallel(t *testing.T) {
data := generateRandomData(1024 * 1024)
source := bytes.NewReader(data)
mr := NewSyncMultiReader(source)

var wg sync.WaitGroup
numReaders := 10
bufferSize := 4096 * 4

readers := make([]io.ReadCloser, numReaders)

for i := 0; i < numReaders; i++ {
readers[i] = mr.NewReadCloser()
}

for i := 0; i < numReaders; i++ {
wg.Add(1)
go func(readerId int) {
defer wg.Done()
reader := readers[readerId]
defer reader.Close()

totalRead := 0
buf := make([]byte, bufferSize)
for totalRead < len(data) {
// Simulate random read sizes
readSize := mathrand.Intn(bufferSize) //nolint:gosec
n, err := reader.Read(buf[:readSize])

if n > 0 {
assert.Equal(t, data[totalRead:totalRead+n], buf[:n], "Reader %d mismatch", readerId)
totalRead += n
}

if err == io.EOF {
assert.Equal(t, len(data), totalRead, "Reader %d EOF mismatch", readerId)
return
}

require.NoError(t, err, "Reader %d error", readerId)

if mathrand.Intn(1000) == 0 { //nolint:gosec
t.Logf("Reader %d closing", readerId)
// Simulate random close
return
}

// Simulate random timing between reads
time.Sleep(time.Millisecond * time.Duration(mathrand.Intn(5))) //nolint:gosec
}

assert.Equal(t, len(data), totalRead, "Reader %d total read mismatch", readerId)
}(i)
}

wg.Wait()
}
2 changes: 1 addition & 1 deletion controller/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
Inputs: build.Inputs{
ContextPath: in.ContextPath,
DockerfilePath: in.DockerfileName,
InStream: inStream,
InStream: build.NewSyncMultiReader(inStream),
NamedContexts: contexts,
},
Ref: in.Ref,
Expand Down

0 comments on commit e403ab2

Please sign in to comment.