[WIP] Improve downloader resilience

This commit is contained in:
Vladimir Stoilov 2024-10-07 16:00:30 +03:00
parent 19422726fe
commit a79be8b6a9
No known key found for this signature in database
GPG key ID: 2F190B67A43A81AF
4 changed files with 130 additions and 83 deletions

View file

@ -46,25 +46,40 @@ type Bundle struct {
Artifacts []Artifact `json:"Artifacts"` Artifacts []Artifact `json:"Artifacts"`
} }
func ParseBundle(indexFile string) (*Bundle, error) { // LoadBundle loads and parses a bundle from filepath.
// Check if the file exists. func LoadBundle(indexFilepath string) (*Bundle, error) {
file, err := os.Open(indexFile)
if err != nil {
return nil, fmt.Errorf("failed to open index file: %w", err)
}
defer func() { _ = file.Close() }()
// Read // Read
content, err := io.ReadAll(file) content, err := os.ReadFile(indexFilepath)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to read index file: %w", err)
} }
// Parse // Parse
var bundle Bundle var bundle Bundle
err = json.Unmarshal(content, &bundle) err = json.Unmarshal(content, &bundle)
if err != nil { if err != nil {
return nil, fmt.Errorf("%s %w", indexFile, err) return nil, fmt.Errorf("%s %w", indexFilepath, err)
}
// Filter artifacts
filtered := make([]Artifact, 0)
for _, a := range bundle.Artifacts {
if a.Platform == "" || a.Platform == currentPlatform {
filtered = append(filtered, a)
}
}
bundle.Artifacts = filtered
return &bundle, nil
}
// ParseBundle parses a bundle from json string.
func ParseBundle(jsonContent string) (*Bundle, error) {
// Parse
var bundle Bundle
err := json.Unmarshal([]byte(jsonContent), &bundle)
if err != nil {
return nil, fmt.Errorf("failed to parse bundle: %w", err)
} }
// Filter artifacts // Filter artifacts
@ -80,7 +95,7 @@ func ParseBundle(indexFile string) (*Bundle, error) {
} }
// Verify checks if the files are present int the dataDir and have the correct hash. // Verify checks if the files are present int the dataDir and have the correct hash.
func (bundle Bundle) Verify(dir string) error { func (bundle *Bundle) Verify(dir string) error {
for _, artifact := range bundle.Artifacts { for _, artifact := range bundle.Artifacts {
artifactPath := filepath.Join(dir, artifact.Filename) artifactPath := filepath.Join(dir, artifact.Filename)
isValid, err := checkIfFileIsValid(artifactPath, artifact) isValid, err := checkIfFileIsValid(artifactPath, artifact)

View file

@ -140,7 +140,7 @@ func getSHA256(path string, unpackType string) (string, error) {
// Decompress if compression was applied to the file. // Decompress if compression was applied to the file.
if unpackType != "" { if unpackType != "" {
content, err = unpack(unpackType, content) content, err = decompress(unpackType, content)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -151,10 +151,7 @@ func getSHA256(path string, unpackType string) (string, error) {
return hex.EncodeToString(hash[:]), nil return hex.EncodeToString(hash[:]), nil
} }
var ( var fileVersionRegex = regexp.MustCompile(`_v[0-9]+-[0-9]+-[0-9]+(-[a-z]+)?`)
fileVersionRegex = regexp.MustCompile(`_v[0-9]+-[0-9]+-[0-9]+(-[a-z]+)?`)
rawVersionRegex = regexp.MustCompile(`^[0-9]+\.[0-9]+\.[0-9]+(-[a-z]+)?$`)
)
func getIdentifierAndVersion(versionedPath string) (identifier, version string, ok bool) { func getIdentifierAndVersion(versionedPath string) (identifier, version string, ok bool) {
dirPath, filename := path.Split(versionedPath) dirPath, filename := path.Split(versionedPath)

View file

@ -38,25 +38,51 @@ func CreateDownloader(index UpdateIndex) Downloader {
} }
} }
func (d *Downloader) downloadIndexFile(ctx context.Context) (err error) { func (d *Downloader) downloadIndexFile(ctx context.Context) error {
// Make sure dir exists // Make sure dir exists
_ = os.MkdirAll(d.dir, defaultDirMode) _ = os.MkdirAll(d.dir, defaultDirMode)
var err error
var content string
for _, url := range d.indexURLs { for _, url := range d.indexURLs {
err = d.downloadIndexFileFromURL(ctx, url) content, err = d.downloadIndexFileFromURL(ctx, url)
if err != nil { if err != nil {
log.Warningf("updates: failed while downloading index file %s", err) log.Warningf("updates: failed while downloading index file %s", err)
continue continue
} }
// Downloading was successful. // Downloading was successful.
bundle, err := ParseBundle(content)
if err != nil {
log.Warningf("updates: %s", err)
continue
}
// Parsing was successful
version, err := semver.NewVersion(d.bundle.Version)
if err != nil {
log.Warningf("updates: failed to parse bundle version: %s", err)
continue
}
// All checks passed. Set and exit the loop.
d.bundle = bundle
d.version = version
err = nil err = nil
break break
} }
if err == nil { if err != nil {
err = d.parseBundle() return err
} }
return // Write the content into a file.
indexFilepath := filepath.Join(d.dir, d.indexFile)
err = os.WriteFile(indexFilepath, []byte(content), defaultFileMode)
if err != nil {
return fmt.Errorf("failed to write index file: %s", err)
}
return nil
} }
// Verify verifies if the downloaded files match the corresponding hash. // Verify verifies if the downloaded files match the corresponding hash.
@ -72,7 +98,7 @@ func (d *Downloader) Verify() error {
func (d *Downloader) parseBundle() error { func (d *Downloader) parseBundle() error {
indexFilepath := filepath.Join(d.dir, d.indexFile) indexFilepath := filepath.Join(d.dir, d.indexFile)
var err error var err error
d.bundle, err = ParseBundle(indexFilepath) d.bundle, err = LoadBundle(indexFilepath)
if err != nil { if err != nil {
return err return err
} }
@ -84,11 +110,11 @@ func (d *Downloader) parseBundle() error {
return nil return nil
} }
func (d *Downloader) downloadIndexFileFromURL(ctx context.Context, url string) error { func (d *Downloader) downloadIndexFileFromURL(ctx context.Context, url string) (string, error) {
// Request the index file // Request the index file
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
if err != nil { if err != nil {
return fmt.Errorf("failed to create GET request to %s: %w", url, err) return "", fmt.Errorf("failed to create GET request to %s: %w", url, err)
} }
if UserAgent != "" { if UserAgent != "" {
req.Header.Set("User-Agent", UserAgent) req.Header.Set("User-Agent", UserAgent)
@ -97,29 +123,22 @@ func (d *Downloader) downloadIndexFileFromURL(ctx context.Context, url string) e
// Perform request // Perform request
resp, err := d.httpClient.Do(req) resp, err := d.httpClient.Do(req)
if err != nil { if err != nil {
return fmt.Errorf("failed GET request to %s: %w", url, err) return "", fmt.Errorf("failed GET request to %s: %w", url, err)
} }
defer func() { _ = resp.Body.Close() }() defer func() { _ = resp.Body.Close() }()
// Check the status code // Check the status code
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("received error from the server status code: %s", resp.Status) return "", fmt.Errorf("received error from the server status code: %s", resp.Status)
}
// Create file
indexFilepath := filepath.Join(d.dir, d.indexFile)
file, err := os.Create(indexFilepath)
if err != nil {
return err
}
defer func() { _ = file.Close() }()
// Write body of the response
_, err = io.Copy(file, resp.Body)
if err != nil {
return err
} }
return nil // Read the content.
content, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(content), nil
} }
// CopyMatchingFilesFromCurrent check if there the current bundle files has matching files with the new bundle and copies them if they match. // CopyMatchingFilesFromCurrent check if there the current bundle files has matching files with the new bundle and copies them if they match.
@ -193,27 +212,13 @@ func (d *Downloader) processArtifact(ctx context.Context, artifact Artifact, fil
return fmt.Errorf("invalid provided hash %s: %w", artifact.SHA256, err) return fmt.Errorf("invalid provided hash %s: %w", artifact.SHA256, err)
} }
// Download // Download and verify
log.Debugf("updates: downloading file: %s", artifact.Filename) log.Debugf("updates: downloading file: %s", artifact.Filename)
content, err := d.downloadFile(ctx, artifact.URLs) content, err := d.downloadAndVerifyArtifact(ctx, artifact.URLs, artifact.Unpack, providedHash)
if err != nil { if err != nil {
return fmt.Errorf("failed to download artifact: %w", err) return fmt.Errorf("failed to download artifact: %w", err)
} }
// Decompress
if artifact.Unpack != "" {
content, err = unpack(artifact.Unpack, content)
if err != nil {
return fmt.Errorf("failed to decompress artifact: %w", err)
}
}
// Verify
hash := sha256.Sum256(content)
if !bytes.Equal(providedHash, hash[:]) {
return fmt.Errorf("failed to verify artifact: %s", artifact.Filename)
}
// Save // Save
tmpFilename := fmt.Sprintf("%s.download", filePath) tmpFilename := fmt.Sprintf("%s.download", filePath)
err = os.WriteFile(tmpFilename, content, artifact.GetFileMode()) err = os.WriteFile(tmpFilename, content, artifact.GetFileMode())
@ -232,39 +237,69 @@ func (d *Downloader) processArtifact(ctx context.Context, artifact Artifact, fil
return nil return nil
} }
func (d *Downloader) downloadFile(ctx context.Context, urls []string) ([]byte, error) { func (d *Downloader) downloadAndVerifyArtifact(ctx context.Context, urls []string, unpack string, expectedHash []byte) ([]byte, error) {
var err error
var content []byte
for _, url := range urls { for _, url := range urls {
// Try to make the request // Download
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) content, err = d.downloadFile(ctx, url)
if err != nil { if err != nil {
log.Warningf("failed to create GET request to %s: %s", url, err) err := fmt.Errorf("failed to download artifact from url: %s, %w", url, err)
continue log.Warningf("%s", err)
}
if UserAgent != "" {
req.Header.Set("User-Agent", UserAgent)
}
resp, err := d.httpClient.Do(req)
if err != nil {
log.Warningf("failed a get file request to: %s", err)
continue
}
defer func() { _ = resp.Body.Close() }()
// Check if the server returned an error
if resp.StatusCode != http.StatusOK {
log.Warningf("server returned non-OK status: %d %s", resp.StatusCode, resp.Status)
continue continue
} }
content, err := io.ReadAll(resp.Body) // Decompress
if err != nil { if unpack != "" {
log.Warningf("failed to read body of response: %s", err) content, err = decompress(unpack, content)
if err != nil {
err = fmt.Errorf("failed to decompress artifact: %w", err)
log.Warningf("%s", err)
continue
}
}
// Calculate and verify hash
hash := sha256.Sum256(content)
if !bytes.Equal(expectedHash, hash[:]) {
err := fmt.Errorf("artifact hash does not match")
log.Warningf("%s", err)
continue continue
} }
// All file downloaded and verified.
return content, nil return content, nil
} }
return nil, fmt.Errorf("failed to download file from the provided urls") return nil, err
}
func (d *Downloader) downloadFile(ctx context.Context, url string) ([]byte, error) {
// Try to make the request
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
if err != nil {
return nil, fmt.Errorf("failed to create GET request to %s: %s", url, err)
}
if UserAgent != "" {
req.Header.Set("User-Agent", UserAgent)
}
resp, err := d.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed a get file request to: %s", err)
}
defer func() { _ = resp.Body.Close() }()
// Check if the server returned an error
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned non-OK status: %d %s", resp.StatusCode, resp.Status)
}
content, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read body of response: %s", err)
}
return content, nil
} }
func (d *Downloader) deleteUnfinishedDownloads() error { func (d *Downloader) deleteUnfinishedDownloads() error {
@ -286,7 +321,7 @@ func (d *Downloader) deleteUnfinishedDownloads() error {
return nil return nil
} }
func unpack(cType string, fileBytes []byte) ([]byte, error) { func decompress(cType string, fileBytes []byte) ([]byte, error) {
switch cType { switch cType {
case "zip": case "zip":
return decompressZip(fileBytes) return decompressZip(fileBytes)

View file

@ -35,7 +35,7 @@ func CreateRegistry(index UpdateIndex) (Registry, error) {
} }
// Parse bundle // Parse bundle
var err error var err error
registry.bundle, err = ParseBundle(filepath.Join(index.Directory, index.IndexFile)) registry.bundle, err = LoadBundle(filepath.Join(index.Directory, index.IndexFile))
if err != nil { if err != nil {
return Registry{}, err return Registry{}, err
} }
@ -57,7 +57,7 @@ func CreateRegistry(index UpdateIndex) (Registry, error) {
func (r *Registry) performUpgrade(downloadDir string, indexFile string) error { func (r *Registry) performUpgrade(downloadDir string, indexFile string) error {
// Make sure provided update is valid // Make sure provided update is valid
indexFilepath := filepath.Join(downloadDir, indexFile) indexFilepath := filepath.Join(downloadDir, indexFile)
bundle, err := ParseBundle(indexFilepath) bundle, err := LoadBundle(indexFilepath)
if err != nil { if err != nil {
return fmt.Errorf("invalid update: %w", err) return fmt.Errorf("invalid update: %w", err)
} }