Enable parallel downloads of updates and fix EOF when unpacking archives

This commit is contained in:
Patrick Pacher 2022-03-02 12:03:46 +01:00 committed by GitHub
parent 9504c41702
commit 7f87e417d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 22 deletions

View file

@ -3,6 +3,7 @@ package updater
import ( import (
"archive/zip" "archive/zip"
"compress/gzip" "compress/gzip"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -36,7 +37,10 @@ func (reg *ResourceRegistry) UnpackResources() error {
if utils.StringInSlice(reg.AutoUnpack, res.Identifier) { if utils.StringInSlice(reg.AutoUnpack, res.Identifier) {
err := res.UnpackArchive() err := res.UnpackArchive()
if err != nil { if err != nil {
multierr = multierror.Append(multierr, err) multierr = multierror.Append(
multierr,
fmt.Errorf("%s: %w", res.Identifier, err),
)
} }
} }
} }
@ -69,7 +73,7 @@ func (res *Resource) UnpackArchive() error {
} }
} }
func (res *Resource) unpackZipArchive() (err error) { func (res *Resource) unpackZipArchive() error {
// Get file and directory paths. // Get file and directory paths.
archiveFile := res.SelectedVersion.storagePath() archiveFile := res.SelectedVersion.storagePath()
destDir := strings.TrimSuffix(archiveFile, zipSuffix) destDir := strings.TrimSuffix(archiveFile, zipSuffix)
@ -115,7 +119,7 @@ func (res *Resource) unpackZipArchive() (err error) {
var archiveReader *zip.ReadCloser var archiveReader *zip.ReadCloser
archiveReader, err = zip.OpenReader(archiveFile) archiveReader, err = zip.OpenReader(archiveFile)
if err != nil { if err != nil {
return return fmt.Errorf("failed to open zip reader: %w", err)
} }
defer func() { defer func() {
_ = archiveReader.Close() _ = archiveReader.Close()
@ -128,20 +132,20 @@ func (res *Resource) unpackZipArchive() (err error) {
filepath.Join(tmpDir, filepath.FromSlash(file.Name)), filepath.Join(tmpDir, filepath.FromSlash(file.Name)),
) )
if err != nil { if err != nil {
return return fmt.Errorf("failed to extract archive file %s: %w", file.Name, err)
} }
} }
// Make the final move. // Make the final move.
err = os.Rename(tmpDir, destDir) err = os.Rename(tmpDir, destDir)
if err != nil { if err != nil {
return return fmt.Errorf("failed to move the extracted archive from %s to %s: %w", tmpDir, destDir, err)
} }
// Fix permissions on the destination dir. // Fix permissions on the destination dir.
err = res.registry.storageDir.EnsureAbsPath(destDir) err = res.registry.storageDir.EnsureAbsPath(destDir)
if err != nil { if err != nil {
return return fmt.Errorf("failed to apply directory permissions on %s: %w", destDir, err)
} }
log.Infof("%s: unpacked %s", res.registry.Name, res.SelectedVersion.versionedPath()) log.Infof("%s: unpacked %s", res.registry.Name, res.SelectedVersion.versionedPath())
@ -153,7 +157,7 @@ func copyFromZipArchive(archiveFile *zip.File, dstPath string) error {
if archiveFile.FileInfo().IsDir() { if archiveFile.FileInfo().IsDir() {
err := os.Mkdir(dstPath, archiveFile.Mode()) err := os.Mkdir(dstPath, archiveFile.Mode())
if err != nil { if err != nil {
return err return fmt.Errorf("failed to create directory %s: %w", dstPath, err)
} }
return nil return nil
} }
@ -161,7 +165,7 @@ func copyFromZipArchive(archiveFile *zip.File, dstPath string) error {
// Open archived file for reading. // Open archived file for reading.
fileReader, err := archiveFile.Open() fileReader, err := archiveFile.Open()
if err != nil { if err != nil {
return err return fmt.Errorf("failed to open file in archive: %w", err)
} }
defer func() { defer func() {
_ = fileReader.Close() _ = fileReader.Close()
@ -170,7 +174,7 @@ func copyFromZipArchive(archiveFile *zip.File, dstPath string) error {
// Open destination file for writing. // Open destination file for writing.
dstFile, err := os.OpenFile(dstPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, archiveFile.Mode()) dstFile, err := os.OpenFile(dstPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, archiveFile.Mode())
if err != nil { if err != nil {
return err return fmt.Errorf("failed to open destination file %s: %w", dstPath, err)
} }
defer func() { defer func() {
_ = dstFile.Close() _ = dstFile.Close()
@ -178,6 +182,11 @@ func copyFromZipArchive(archiveFile *zip.File, dstPath string) error {
// Copy full file from archive to dst. // Copy full file from archive to dst.
if _, err := io.CopyN(dstFile, fileReader, MaxUnpackSize); err != nil { if _, err := io.CopyN(dstFile, fileReader, MaxUnpackSize); err != nil {
// EOF is expected here as the archive is likely smaller
// thane MaxUnpackSize
if errors.Is(err, io.EOF) {
return nil
}
return err return err
} }

View file

@ -9,6 +9,7 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"github.com/safing/portbase/log" "github.com/safing/portbase/log"
"github.com/safing/portbase/utils" "github.com/safing/portbase/utils"
@ -135,26 +136,43 @@ func (reg *ResourceRegistry) DownloadUpdates(ctx context.Context) error {
} }
// check download dir // check download dir
err := reg.tmpDir.Ensure() if err := reg.tmpDir.Ensure(); err != nil {
if err != nil {
return fmt.Errorf("could not prepare tmp directory for download: %w", err) return fmt.Errorf("could not prepare tmp directory for download: %w", err)
} }
// download updates // download updates
log.Infof("%s: starting to download %d updates", reg.Name, len(toUpdate)) log.Infof("%s: starting to download %d updates parallel", reg.Name, len(toUpdate))
var wg sync.WaitGroup
wg.Add(len(toUpdate))
client := &http.Client{} client := &http.Client{}
for _, rv := range toUpdate {
for tries := 0; tries < 3; tries++ { for idx := range toUpdate {
err = reg.fetchFile(ctx, client, rv, tries) go func(rv *ResourceVersion) {
if err == nil { var err error
rv.Available = true
break defer wg.Done()
defer func() {
if x := recover(); x != nil {
log.Errorf("%s: %s: captured panic: %s", reg.Name, rv.resource.Identifier, x)
}
}()
for tries := 0; tries < 3; tries++ {
err = reg.fetchFile(ctx, client, rv, tries)
if err == nil {
rv.Available = true
return
}
} }
} if err != nil {
if err != nil { log.Warningf("%s: failed to download %s version %s: %s", reg.Name, rv.resource.Identifier, rv.VersionNumber, err)
log.Warningf("%s: failed to download %s version %s: %s", reg.Name, rv.resource.Identifier, rv.VersionNumber, err) }
} }(toUpdate[idx])
} }
wg.Wait()
log.Infof("%s: finished downloading updates", reg.Name) log.Infof("%s: finished downloading updates", reg.Name)
return nil return nil