Skip to content

Commit

Permalink
FFMPEG progress reader refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
vpoluyaktov committed Nov 15, 2023
1 parent ccaf65e commit b9915b6
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 151 deletions.
10 changes: 10 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
ConcurrentDownloaders int `yaml:"ConcurrentDownloaders"`
ConcurrentEncoders int `yaml:"ConcurrentEncoders"`
ReEncodeFiles bool `yaml:"ReEncodeFiles"`
BasePortNumber int `yaml:"BasePortNumber"`
BitRateKbs int `yaml:"BitRateKbs"`
SampleRateHz int `yaml:"SampleRateHz"`
MaxFileSizeMb int `yaml:"MaxFileSizeMb"`
Expand Down Expand Up @@ -69,6 +70,7 @@ func Load() {
config.ConcurrentDownloaders = 5
config.ConcurrentEncoders = 5
config.ReEncodeFiles = true
config.BasePortNumber = 31000
config.BitRateKbs = 96
config.SampleRateHz = 44100
config.MaxFileSizeMb = 250
Expand Down Expand Up @@ -205,6 +207,14 @@ func (c *Config) IsReEncodeFiles() bool {
return c.ReEncodeFiles
}

func (c *Config) SetBasePortNumber(port int) {
c.BasePortNumber = port
}

func (c *Config) GetBasePortNumber() int {
return c.BasePortNumber
}

func (c *Config) SetBitRate(b int) {
c.BitRateKbs = b
}
Expand Down
143 changes: 71 additions & 72 deletions internal/controller/buildController.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -25,14 +24,18 @@ type BuildController struct {
ab *dto.Audiobook
startTime time.Time
stopFlag bool

// progress tracking arrays
filesBuild []fileBuild
files []fileBuild
}

// progress tracking arrays
type fileBuild struct {
fileId int
progress int
fileName string
totalDuration float64
bytesProcessed int64
secondsProcessed float64
encodingSpeed float64
progress int
complete bool
}

func NewBuildController(dispatcher *mq.Dispatcher) *BuildController {
Expand Down Expand Up @@ -70,6 +73,8 @@ func (c *BuildController) startBuild(cmd *dto.BuildCommand) {
logger.Info(mq.BuildController + " received " + cmd.String())

c.ab = cmd.Audiobook
c.stopFlag = false
c.files = make([]fileBuild, len(c.ab.Parts))

// calculate output file names
for i := range c.ab.Parts {
Expand All @@ -80,6 +85,7 @@ func (c *BuildController) startBuild(cmd *dto.BuildCommand) {
}
part.AACFile = filePath + ".aac"
part.M4BFile = filePath + ".m4b"
c.files[i].totalDuration = part.Duration
}

c.mq.SendMessage(mq.BuildController, mq.BuildPage, &dto.DisplayBookInfoCommand{Audiobook: c.ab}, true)
Expand All @@ -93,16 +99,12 @@ func (c *BuildController) startBuild(cmd *dto.BuildCommand) {
c.downloadCoverImage(c.ab)

// build audiobook parts
c.stopFlag = false
c.filesBuild = make([]fileBuild, len(c.ab.Parts))

jd := utils.NewJobDispatcher(c.ab.Config.GetConcurrentEncoders())
for i := range c.ab.Parts {
jd.AddJob(i, c.buildAudiobookPart, c.ab, i)
}
go c.updateTotalBuildProgress()
// if c.stopFlag {
// break
// }
go c.updateTotalProgress()
jd.Start()

c.stopFlag = true
Expand Down Expand Up @@ -193,12 +195,16 @@ func (c *BuildController) downloadCoverImage(ab *dto.Audiobook) error {
}

func (c *BuildController) buildAudiobookPart(ab *dto.Audiobook, partId int) {
if c.stopFlag {
return
}

part := &ab.Parts[partId]

// launch progress listener
l, port := c.startProgressListener(partId)
defer l.Close()
go c.updateFileBuildProgress(partId, part.M4BFile, part.Duration, l)
go c.updateFileProgress(partId, l)

// concatenate mp3 files into single .aac file
_, err := ffmpeg.NewFFmpeg().
Expand Down Expand Up @@ -239,95 +245,88 @@ func (c *BuildController) startProgressListener(fileId int) (net.Listener, int)
return l, portNumber
}

func (c *BuildController) updateFileBuildProgress(fileId int, fileName string, totalDuration float64, l net.Listener) {

re := regexp.MustCompile(`out_time_ms=(\d+)`)
func (c *BuildController) updateFileProgress(fileId int, l net.Listener) {
fd, err := l.Accept()
if err != nil {
return // listener is closed
return // listener may be closed already
}
buf := make([]byte, 16)
data := ""
percent := 0
for {

for !c.stopFlag {
_, err := fd.Read(buf)
if err != nil {
return // listener is closed
}
data += string(buf)
a := re.FindAllStringSubmatch(data, -1)
p := 0
pstr := ""
if len(a) > 0 && len(a[len(a)-1]) > 0 {
c, _ := strconv.Atoi(a[len(a)-1][len(a[len(a)-1])-1])
pstr = fmt.Sprintf("%.2f", float64(c)/totalDuration/1000000)
}
if strings.Contains(data, "progress=end") {
p = 100
bytesProcessed, secondsProcessed, encodingSpeed, complete := ffmpeg.ParseFFMPEGProgress(data)
percent = int(secondsProcessed / c.files[fileId].totalDuration * 100)
// wrong calculation protection
if percent < 0 {
percent = 0
} else if percent > 100 {
percent = 100
} else if percent < c.files[fileId].progress {
percent = c.files[fileId].progress
} else if complete {
percent = 100
}
if pstr == "" {
p = 0
}
pflt, err := strconv.ParseFloat(pstr, 64)
if err != nil {
p = 0
} else {
p = int(pflt * 100)
}

if p != percent {
percent = p

// wrong calculation protection
if percent < 0 {
percent = 0
} else if percent > 100 {
percent = 100
}

// sent a message only if progress changed
c.mq.SendMessage(mq.BuildController, mq.BuildPage, &dto.BuildFileProgress{FileId: fileId, FileName: fileName, Percent: percent}, true)
// sent a message only if progress changed
if percent != c.files[fileId].progress {
c.files[fileId].bytesProcessed = bytesProcessed
c.files[fileId].secondsProcessed = secondsProcessed
c.files[fileId].encodingSpeed = encodingSpeed
c.files[fileId].progress = percent
c.files[fileId].complete = complete
c.mq.SendMessage(mq.BuildController, mq.BuildPage, &dto.BuildFileProgress{FileId: fileId, FileName: c.files[fileId].fileName, Percent: percent}, true)
}
c.filesBuild[fileId].fileId = fileId
c.filesBuild[fileId].progress = percent
}
}

func (c *BuildController) updateTotalBuildProgress() {
var percent int = -1

for !c.stopFlag && percent <= 100 {
var totalPercent int = 0
files := 0
for _, f := range c.filesBuild {
totalPercent += f.progress
if f.progress == 100 {
files++
func (c *BuildController) updateTotalProgress() {
var p int = -1

for !c.stopFlag && p <= 100 {
var totalDuration float64 = 0
var secondsProcessed float64 = 0
var totalSpeed float64 = 0
filesProcessed := 0
filesComplete := 0
for _, f := range c.files {
totalDuration += f.totalDuration
secondsProcessed += f.secondsProcessed
totalSpeed += f.encodingSpeed
if f.complete {
filesComplete++
}
if f.encodingSpeed > 0 {
filesProcessed++
}
}
p := int(totalPercent / len(c.filesBuild))
percent := int(secondsProcessed / totalDuration * 100)
// wrong calculation protection
if percent < 0 {
percent = 0
} else if percent > 100 {
percent = 100
}

if percent != p {
// sent a message only if progress changed
percent = p

// wrong calculation protection
if percent < 0 {
percent = 0
} else if percent > 100 {
percent = 100
}
p = percent

elapsed := time.Since(c.startTime).Seconds()
speed := int64(float64(percent) / elapsed)
speed := totalSpeed / float64(filesProcessed)
eta := (100 / (float64(percent) / elapsed)) - elapsed
if eta < 0 || eta > (60*60*24*365) {
eta = 0
}

elapsedH := utils.SecondsToTime(elapsed)
filesH := fmt.Sprintf("%d/%d", files, len(c.ab.Parts))
speedH := utils.SpeedToHuman(speed)
filesH := fmt.Sprintf("%d/%d", filesComplete, len(c.ab.Parts))
speedH := fmt.Sprintf("%.0fx", speed)
etaH := utils.SecondsToTime(eta)

c.mq.SendMessage(mq.BuildController, mq.BuildPage, &dto.BuildProgress{Elapsed: elapsedH, Percent: percent, Files: filesH, Speed: speedH, ETA: etaH}, true)
Expand Down
Loading

0 comments on commit b9915b6

Please sign in to comment.