Skip to content

Commit

Permalink
Copy and Cleanup controllers implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
vpoluyaktov committed Oct 26, 2023
1 parent 3693d19 commit 88dd7a0
Show file tree
Hide file tree
Showing 15 changed files with 412 additions and 207 deletions.
14 changes: 12 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
LogFileName string
OutputDir string
LogLevel string
MaxSearchRows int
UseMock bool
SaveMock bool
SearchCondition string
Expand All @@ -44,17 +45,18 @@ func Load() {
config.LogFileName = "abb_ia.log"
config.OutputDir = "output"
config.LogLevel = "INFO"
config.MaxSearchRows = 100
config.UseMock = false
config.SaveMock = false
config.SearchCondition = ""
config.ParrallelDownloads = 5
config.ParrallelEncoders = 5
config.ReEncodeFiles = false
config.ReEncodeFiles = true
config.BitRate = "128k"
config.SampleRate = "44100"
config.MaxFileSize = 1024 * 1024 * 10
config.CopyToAudiobookshelf = true
config.AudiobookshelfDir = "audiobookshelf"
config.AudiobookshelfDir = "/mnt/NAS/Audiobooks/Internet Archive"
config.ShortenTitles = true

fmt.Printf("Using config: %s\n", configFile)
Expand Down Expand Up @@ -118,6 +120,14 @@ func LogLevel() string {
return configInstance.LogLevel
}

func SetMaxSearchRows(r int) {
configInstance.MaxSearchRows = r
}

func MaxSearchRows() int {
return configInstance.MaxSearchRows
}

func UseMock(b bool) {
configInstance.UseMock = b
}
Expand Down
168 changes: 11 additions & 157 deletions internal/controller/buildController.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controller

import (
"bufio"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -30,32 +29,13 @@ type BuildController struct {

// progress tracking arrays
filesBuild []fileBuild
filesCopy []fileCopy
}

type fileBuild struct {
fileId int
progress int
}

type fileCopy struct {
fileId int
fileSize int64
bytesCopied int64
progress int
}

// Progress Reader for file copy progress
type Fn func(fileId int, fileName string, size int64, pos int64, percent int)
type ProgressReader struct {
FileId int
FileName string
Reader io.Reader
Size int64
Pos int64
Percent int
Callback Fn
}

func NewBuildController(dispatcher *mq.Dispatcher) *BuildController {
dc := &BuildController{}
Expand Down Expand Up @@ -127,19 +107,6 @@ func (c *BuildController) startBuild(cmd *dto.BuildCommand) {
// }
jd.Start()

// copy book to Audiobookshelf
if config.IsCopyToAudiobookshelf() {
c.filesCopy = make([]fileCopy, len(c.ab.Parts))
jd := utils.NewJobDispatcher(1)
for i := range c.ab.Parts {
jd.AddJob(i, c.copyAudiobookPart, c.ab, i)
}
go c.updateTotalCopyProgress()
jd.Start()
}

c.cleanUp(c.ab)

c.mq.SendMessage(mq.BuildController, mq.Footer, &dto.SetBusyIndicator{Busy: false}, false)
c.mq.SendMessage(mq.BuildController, mq.Footer, &dto.UpdateStatus{Message: ""}, false)
c.mq.SendMessage(mq.BuildController, mq.BuildPage, &dto.BuildComplete{Audiobook: cmd.Audiobook}, true)
Expand Down Expand Up @@ -261,54 +228,6 @@ func (c *BuildController) buildAudiobookPart(ab *dto.Audiobook, partId int) {
}
}

func (c *BuildController) copyAudiobookPart(ab *dto.Audiobook, partId int) {

part := &ab.Parts[partId]

file, err := os.Open(part.M4BFile)
if err != nil {
logger.Error("Can't open .mb4 file: " + err.Error())
return
}
fileReader := bufio.NewReader(file)
defer file.Close()

destPath := filepath.Clean(filepath.Join(config.AudiobookshelfDir(), ab.Author, ab.Title, ab.Series, filepath.Base(part.M4BFile)))
destDir := filepath.Dir(destPath)

if err := os.MkdirAll(destDir, 0750); err != nil {
logger.Error("Can't create output directory: " + err.Error())
return
}
f, err := os.OpenFile(destPath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
logger.Fatal("Can't create temporary file: " + err.Error())
return
}
defer f.Close()

progressReader := &ProgressReader{
FileId: partId,
FileName: part.M4BFile,
Reader: fileReader,
Size: part.Size,
Callback: c.updateFileCopyProgress,
}

if _, err := io.Copy(f, progressReader); err != nil {
logger.Error("Error while copying .m4b file: " + err.Error())
}
}

func (pr *ProgressReader) Read(p []byte) (int, error) {
n, err := pr.Reader.Read(p)
if err == nil {
pr.Pos += int64(n)
pr.Percent = int(float64(pr.Pos) / float64(pr.Size) * 100)
pr.Callback(pr.FileId, pr.FileName, pr.Size, pr.Pos, pr.Percent)
}
return n, err
}

func (c *BuildController) startProgressListener(fileId int) (net.Listener, int) {

Expand Down Expand Up @@ -360,9 +279,14 @@ func (c *BuildController) updateFileBuildProgress(fileId int, fileName string, t

if p != percent {
percent = p
if percent > 100 {

// wrong calculation protection
if percent < 0 {
percent = 0
} else if percent > 100 {
percent = 100
}

// sent a message only if progress changed
c.mq.SendMessage(mq.BuildController, mq.BuildPage, &dto.BuildFileProgress{FileId: fileId, FileName: fileName, Percent: percent}, true)
}
Expand All @@ -388,7 +312,11 @@ func (c *BuildController) updateTotalBuildProgress() {
if percent != p {
// sent a message only if progress changed
percent = p
if percent > 100 {

// wrong calculation protection
if percent < 0 {
percent = 0
} else if percent > 100 {
percent = 100
}

Expand All @@ -410,77 +338,3 @@ func (c *BuildController) updateTotalBuildProgress() {
}
}

func (c *BuildController) updateFileCopyProgress(fileId int, fileName string, size int64, pos int64, percent int) {
if c.filesBuild[fileId].progress != percent {
if percent > 100 {
percent = 100
}
// sent a message only if progress changed
c.mq.SendMessage(mq.BuildController, mq.BuildPage, &dto.CopyFileProgress{FileId: fileId, FileName: fileName, Percent: percent}, false)
}
c.filesCopy[fileId].fileId = fileId
c.filesCopy[fileId].fileSize = size
c.filesCopy[fileId].bytesCopied = pos
c.filesCopy[fileId].progress = percent
}

func (c *BuildController) updateTotalCopyProgress() {
var percent int = -1

item := c.ab.IAItem
for !c.stopFlag && percent <= 100 {
var totalSize = item.TotalSize
var totalBytesDownloaded int64 = 0
filesDownloaded := 0
for _, f := range c.filesCopy {
totalBytesDownloaded += f.bytesCopied
if f.progress == 100 {
filesDownloaded++
}
}

var p int = 0
if totalSize > 0 {
p = int(float64(totalBytesDownloaded) / float64(totalSize) * 100)
}

// fix wrong file size returned by IA metadata
if filesDownloaded == len(c.filesBuild) {
p = 100
totalBytesDownloaded = item.TotalSize
}

if percent != p {
// sent a message only if progress changed
percent = p

elapsed := time.Since(c.startTime).Seconds()
speed := int64(float64(totalBytesDownloaded) / elapsed)
eta := (100 / (float64(percent) / elapsed)) - elapsed
if eta < 0 || eta > (60*60*24*365) {
eta = 0
}

elapsedH := utils.SecondsToTime(elapsed)
bytesH := utils.BytesToHuman(totalBytesDownloaded)
filesH := fmt.Sprintf("%d/%d", filesDownloaded, len(item.AudioFiles))
speedH := utils.SpeedToHuman(speed)
etaH := utils.SecondsToTime(eta)

c.mq.SendMessage(mq.BuildController, mq.BuildPage, &dto.CopyProgress{Elapsed: elapsedH, Percent: percent, Files: filesH, Bytes: bytesH, Speed: speedH, ETA: etaH}, false)
}
time.Sleep(mq.PullFrequency)
}
}

func (c *BuildController) cleanUp(ab *dto.Audiobook) {
if !(config.IsSaveMock() || config.IsUseMock()) {
os.RemoveAll(ab.OutputDir)
}
for _, part := range ab.Parts {
os.Remove(part.AACFile)
os.Remove(part.FListFile)
os.Remove(part.MetadataFile)
}
os.Remove(ab.CoverFile)
}
56 changes: 56 additions & 0 deletions internal/controller/cleanupController.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package controller

import (
"os"

"github.com/vpoluyaktov/abb_ia/internal/config"
"github.com/vpoluyaktov/abb_ia/internal/dto"
"github.com/vpoluyaktov/abb_ia/internal/logger"
"github.com/vpoluyaktov/abb_ia/internal/mq"
)

type CleanupController struct {
mq *mq.Dispatcher
ab *dto.Audiobook
}

func NewCleanupController(dispatcher *mq.Dispatcher) *CleanupController {
dc := &CleanupController{}
dc.mq = dispatcher
dc.mq.RegisterListener(mq.CleanupController, dc.dispatchMessage)
return dc
}

func (c *CleanupController) checkMQ() {
m := c.mq.GetMessage(mq.CleanupController)
if m != nil {
c.dispatchMessage(m)
}
}

func (c *CleanupController) dispatchMessage(m *mq.Message) {
switch dto := m.Dto.(type) {
case *dto.CleanupCommand:
go c.cleanUp(dto)
default:
m.UnsupportedTypeError(mq.CleanupController)
}
}

func (c *CleanupController) cleanUp(cmd *dto.CleanupCommand) {
logger.Info(mq.CleanupController + " received " + cmd.String())
c.ab = cmd.Audiobook

if !(config.IsSaveMock() || config.IsUseMock()) {
os.RemoveAll(c.ab.OutputDir)
}
for _, part := range c.ab.Parts {
os.Remove(part.AACFile)
os.Remove(part.FListFile)
os.Remove(part.MetadataFile)
if config.IsCopyToAudiobookshelf() {
os.Remove(part.M4BFile)
}
}
os.Remove(c.ab.CoverFile)
}
2 changes: 2 additions & 0 deletions internal/controller/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func NewConductor(dispatcher *mq.Dispatcher) *Conductor {
c.controllers = append(c.controllers, NewEncodingController(c.dispatcher))
c.controllers = append(c.controllers, NewChaptersController(c.dispatcher))
c.controllers = append(c.controllers, NewBuildController(c.dispatcher))
c.controllers = append(c.controllers, NewCopyController(c.dispatcher))
c.controllers = append(c.controllers, NewCleanupController(c.dispatcher))
return c
}

Expand Down
Loading

0 comments on commit 88dd7a0

Please sign in to comment.