Skip to content

Commit

Permalink
feat: add options and usage to cmaf-ingest-receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
tobbee committed Jul 2, 2024
1 parent 4f63a49 commit 40301f1
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 22 deletions.
99 changes: 83 additions & 16 deletions cmd/cmaf-ingest-receiver/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"flag"
"fmt"
"io"
"log/slog"
Expand All @@ -11,22 +12,88 @@ import (
"strings"
"time"

"github.com/Dash-Industry-Forum/livesim2/pkg/logging"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)

const storageRoot = "./storage"
var usg = `Usage of %s:
%s receives files sent to /upload/* and stores them at storage/*.
The files should be uploaded using PUT or POST.
`

type options struct {
port int
storage string
logLevel string
logFormat string
version bool
}

const (
defaultStorageRoot = "./storage"
defaultPort = 8080
defaultLogLevel = "info"
defaultLogFormat = "text"
)

func parseOptions() (*options, error) {
var opts options
flag.IntVar(&opts.port, "port", defaultPort, "HTTP receiver port")
flag.StringVar(&opts.storage, "storage", defaultStorageRoot, "Storage root directory")
flag.StringVar(&opts.logLevel, "loglevel", defaultLogLevel, "Log level (info, debug, warn)")
flag.StringVar(&opts.logFormat, "logformat", defaultLogFormat, "Log format (text, json)")

flag.BoolVar(&opts.version, "version", false, "Get version")

flag.Usage = func() {
parts := strings.Split(os.Args[0], "/")
name := parts[len(parts)-1]
fmt.Fprintf(os.Stderr, usg, name, name)
fmt.Fprintf(os.Stderr, "\nRun as: %s options with options:\n\n", name)
flag.PrintDefaults()
}

flag.Parse()
return &opts, nil
}

func main() {
opts, err := parseOptions()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to parse options: %v\n", err)
flag.Usage()
os.Exit(1)
}
err = logging.InitSlog(opts.logLevel, opts.logFormat)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to init logging: %v\n", err)
}
r := chi.NewRouter()
r.Use(middleware.Logger)

r.Put("/upload/*", func(w http.ResponseWriter, r *http.Request) {
receiver := makeReceiver(opts)

r.Put("/upload/*", receiver)
r.Post("/upload/*", receiver)

slog.Info("Starting server", "port", opts.port)
err = http.ListenAndServe(fmt.Sprintf(":%d", opts.port), r)
if err != nil {
slog.Error("Server error", "err", err)
os.Exit(1)
}
}

func makeReceiver(opts *options) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
// Extract the path and filename from URL
// Drop the first part that should be /upload or similar.
path := r.URL.Path
parts := strings.Split(path, "/")
filePath := filepath.Join(storageRoot, strings.Join(parts[2:], "/"))
filePath := filepath.Join(opts.storage, strings.Join(parts[2:], "/"))
err := os.MkdirAll(filepath.Dir(filePath), 0755)
if err != nil {
http.Error(w, "Failed to create directory", http.StatusInternalServerError)
Expand All @@ -36,16 +103,14 @@ func main() {
slog.Info("Closing body", "filename", filePath)
r.Body.Close()
}()
fmt.Printf("Headers %+v\n", r.Header)
fmt.Println("Writing to", filePath)
// Read the content from the PUT request
// For low-latency, this will be a stream, so one
// need to have a loop reading
slog.Debug("Headers", "headers", r.Header)
slog.Debug("Output", "filepath", filePath)
bufSize := 65536
buf := make([]byte, bufSize)
ofh, err := os.Create(filePath)
if err != nil {
http.Error(w, "Failed to create file", http.StatusInternalServerError)
slog.Error("Failed to create file", "err", err)
return
}
defer ofh.Close()
Expand All @@ -55,6 +120,7 @@ func main() {
contentLength, err = strconv.Atoi(r.Header.Get("Content-Length"))
if err != nil {
http.Error(w, "Failed to parse Content-Length", http.StatusBadRequest)
slog.Error("Failed to parse Content-Length", "err", err)
return
}
}
Expand All @@ -74,6 +140,11 @@ func main() {
break
}
nrWithoutData++
if nrWithoutData == 10 {
slog.Info("No data for 10 reads, breaking", "nrRead", nrRead)
w.WriteHeader(http.StatusBadRequest)
return
}
time.Sleep(10 * time.Millisecond)
continue
}
Expand All @@ -83,12 +154,14 @@ func main() {
nOut, err := ofh.Write(buf[:n])
if err != nil {
http.Error(w, "Failed to write file", http.StatusInternalServerError)
return
}
slog.Info("wrote bytes", "n", n, "path", path)
slog.Debug("wrote bytes", "n", n, "path", path)
nrWritten += nOut
slog.Info("Reading", "nrRead", nrRead, "contentLength", contentLength)
slog.Debug("Reading", "nrRead", nrRead, "contentLength", contentLength)
if nOut != n {
http.Error(w, "Failed to write all bytes", http.StatusInternalServerError)
return
}
if eof {
break
Expand All @@ -99,11 +172,5 @@ func main() {
slog.Error("write file", "err", err)
}
w.WriteHeader(http.StatusOK)
})

fmt.Println("Starting server on :8080")
err := http.ListenAndServe(":8080", r)
if err != nil {
slog.Error("Failed to start server", "err", err)
}
}
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi/v5 v5.0.12 h1:9euLV5sTrTNTRUU9POmDUvfxyj6LAABLUcEWO+JJb4s=
github.com/go-chi/chi/v5 v5.0.12/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
Expand Down Expand Up @@ -204,8 +202,6 @@ github.com/mholt/acmez/v2 v2.0.1 h1:3/3N0u1pLjMK4sNEAFSI+bcvzbPhRpY383sy1kLHJ6k=
github.com/mholt/acmez/v2 v2.0.1/go.mod h1:fX4c9r5jYwMyMsC+7tkYRxHibkOTgta5DIFGoe67e1U=
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
github.com/miekg/dns v1.1.59 h1:C9EXc/UToRwKLhK5wKU/I4QVsBUc8kE6MkHBkeypWZs=
github.com/miekg/dns v1.1.59/go.mod h1:nZpewl5p6IvctfgrckopVx2OlSEHPRO/U4SYkRklrEk=
github.com/miekg/dns v1.1.61 h1:nLxbwF3XxhwVSm8g9Dghm9MHPaUZuqhPiGL+675ZmEs=
github.com/miekg/dns v1.1.61/go.mod h1:mnAarhS3nWaW+NVP2wTkYVIZyHNJ098SJZUki3eykwQ=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
Expand Down Expand Up @@ -261,8 +257,6 @@ github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQy
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8=
github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
Expand Down

0 comments on commit 40301f1

Please sign in to comment.