BookfusionPluginsResearch/BookfusionBackSync/sync_worker.py

497 lines
19 KiB
Python

__license__ = 'GPL v3'
import json
import logging
import http.client
import os
import tempfile
import time
import uuid
import urllib.request
import urllib.parse
import urllib.error
from datetime import datetime
from PyQt5.Qt import QThread, pyqtSignal
from calibre_plugins.bookfusionbacksync.config import prefs
_API_BASE = 'https://bookfusion.com/api'
_GET_HEADERS = {
'Accept': 'application/json, application/*+json',
'User-Agent': 'BookFusion/2.22.0 (Android 12; Xiaomi 2201117TG; arm64-v8a)',
}
_POST_HEADERS = {
**_GET_HEADERS,
'Content-Type': 'application/json',
}
_DEFAULT_FETCH_PAGE_SIZE = 1337
_DEFAULT_FETCH_TIMEOUT = 90
_SKIP_LOG_SAMPLE_LIMIT = 20
_DEFAULT_COMPLETED_THRESHOLD = 98.9
class SyncWorker(QThread):
log_message = pyqtSignal(str)
# fetch_progress(current, total): progressive estimate for initial network fetch
fetch_progress = pyqtSignal(int, int)
# progress(current, total): scanning/sync phase, mapped to percent in UI
progress = pyqtSignal(int, int)
status = pyqtSignal(str)
finished = pyqtSignal(int, int) # updated, skipped
def __init__(self, db, library_path=None):
QThread.__init__(self)
self.db = db
self.library_path = library_path
self._stop = False
self._log = logging.getLogger('bookfusionbacksync')
def stop(self):
self._stop = True
def run(self):
try:
self._sync()
except Exception as exc:
self._log.exception('Fatal error in sync')
self.status.emit(f'Error: {exc}')
self.log_message.emit(f'Fatal error: {exc}')
self.finished.emit(0, 0)
# ── File logging ─────────────────────────────────────────────────────────
def _setup_logging(self):
base_path = self.library_path or getattr(self.db, 'library_path', None)
if not base_path:
base_path = tempfile.gettempdir()
log_path = os.path.join(base_path, 'bookfusionbacksync.log')
logger = logging.getLogger('bookfusionbacksync')
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
fh = logging.FileHandler(log_path, encoding='utf-8')
fh.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)-8s %(message)s', '%Y-%m-%d %H:%M:%S'
))
logger.addHandler(fh)
logger.info('' * 60)
logger.info('Sync started')
return logger
# ── Main flow ────────────────────────────────────────────────────────────
def _sync(self):
self._log = self._setup_logging()
device = prefs['device']
if not device:
device = uuid.uuid4().hex[:16]
prefs['device'] = device
self._log.info(f'Generated device ID: {device}')
email = prefs['email']
password = prefs['password']
column = prefs['last_read_column']
completed_column = prefs['completed_column']
completed_threshold = _as_float(
prefs['completed_threshold'],
_DEFAULT_COMPLETED_THRESHOLD,
)
fetch_page_size = _as_int(
prefs['fetch_page_size'],
_DEFAULT_FETCH_PAGE_SIZE,
min_value=10,
max_value=50000,
)
fetch_timeout = _as_int(
prefs['fetch_timeout'],
_DEFAULT_FETCH_TIMEOUT,
min_value=5,
max_value=600,
)
full_skip_logs = bool(prefs['full_skip_logs'])
self._log.info(
f'Email: {email} Date column: {column} '
f'Completed column: {completed_column or "(disabled)"} '
f'Completed threshold: {completed_threshold:.2f}%'
)
# 1. Authenticate
self.status.emit('Authenticating…')
try:
token = self._login(device, email, password)
except urllib.error.HTTPError as exc:
raise RuntimeError(
f'Login failed (HTTP {exc.code}) — check email and password in Settings'
)
self._emit_log(f'Authenticated as {email}')
if self._stop:
return self.finished.emit(0, 0)
# Pre-scan local identifiers once. Used both for matching and
# as an initial estimate for fetch progress total.
self.status.emit('Scanning Calibre library…')
calibre_books = self._calibre_books_with_bf_id()
self._emit_log(
f'{len(calibre_books)} Calibre books have a BookFusion identifier'
)
expected_fetch_total = max(len(calibre_books), 1)
# 2. Fetch all BookFusion books (paginated).
self.status.emit('Fetching BookFusion library…')
self._log.info(
f'Fetch settings: per_page={fetch_page_size}, timeout={fetch_timeout}s'
)
bf_books = self._fetch_all_books(
device,
token,
per_page=fetch_page_size,
timeout=fetch_timeout,
expected_total=expected_fetch_total,
)
self._emit_log(f'Fetched {len(bf_books)} books from BookFusion')
if self._stop:
return self.finished.emit(0, 0)
# Build lookups for both id spaces used by BookFusion APIs:
# - BookV3.id (user library record id)
# - BookV3.book_id (global uploaded/catalog id, used by official Calibre plugin)
bf_map_by_id = {}
bf_map_by_book_id = {}
bf_completed_by_id = {}
bf_completed_by_book_id = {}
bf_all_ids = set()
bf_all_book_ids = set()
invalid_date_count = 0
invalid_percentage_count = 0
for book in bf_books:
bf_id = _id_key(book.get('id'))
bf_book_id = _id_key(book.get('book_id'))
if bf_id:
bf_all_ids.add(bf_id)
if bf_book_id:
bf_all_book_ids.add(bf_book_id)
date_str = book.get('last_read_at')
source = 'last_read_at'
if not date_str:
rp = book.get('reading_position') or {}
date_str = rp.get('updated_at')
source = 'reading_position.updated_at'
if date_str:
dt = _parse_iso(date_str)
if dt is None:
invalid_date_count += 1
continue
if bf_id:
bf_map_by_id[bf_id] = (dt, source)
if bf_book_id:
bf_map_by_book_id[bf_book_id] = (dt, source)
rp = book.get('reading_position') or {}
pct = _as_float(rp.get('percentage'))
if pct is None:
if rp.get('percentage') is not None:
invalid_percentage_count += 1
continue
if pct >= completed_threshold:
if bf_id:
bf_completed_by_id[bf_id] = pct
if bf_book_id:
bf_completed_by_book_id[bf_book_id] = pct
self._emit_log(
f'BookFusion date map: by book_id={len(bf_map_by_book_id)}, by id={len(bf_map_by_id)}, '
f'invalid_dates={invalid_date_count}'
)
self._emit_log(
f'BookFusion completed map (threshold>={completed_threshold:.2f}%): '
f'by book_id={len(bf_completed_by_book_id)}, by id={len(bf_completed_by_id)}, '
f'invalid_percentage={invalid_percentage_count}'
)
self._emit_log(
f'BookFusion ids observed: book_id={len(bf_all_book_ids)}, id={len(bf_all_ids)}'
)
# 3. Match/write phase (progress bar switches to determinate mode)
self.status.emit('Scanning Calibre library…')
def _calibre_sort_key(item):
_, bf_id = item
entry = bf_map_by_book_id.get(bf_id) or bf_map_by_id.get(bf_id)
if entry is None:
return (1, 0.0)
dt, _ = entry
return (0, -dt.timestamp())
calibre_books.sort(key=_calibre_sort_key)
self._log.info('Calibre processing order: newest read dates first; books without dates last')
total = len(calibre_books)
date_updates = {}
completed_updates = {}
skipped = 0
date_matched_by_book_id = 0
date_matched_by_id = 0
completed_matched_by_book_id = 0
completed_matched_by_id = 0
skipped_no_date = 0
skipped_not_in_api = 0
skip_no_date_samples = 0
skip_not_in_api_samples = 0
last_scan_percent = -1
for i, (cal_id, bf_id) in enumerate(calibre_books):
if self._stop:
break
if total > 0:
scan_percent = int(i * 100 / total)
if scan_percent != last_scan_percent:
last_scan_percent = scan_percent
self.progress.emit(i, total)
date_entry = bf_map_by_book_id.get(bf_id)
date_match_via = 'book_id'
if date_entry is None:
date_entry = bf_map_by_id.get(bf_id)
date_match_via = 'id'
completed_pct = None
completed_match_via = 'book_id'
if completed_column:
completed_pct = bf_completed_by_book_id.get(bf_id)
if completed_pct is None:
completed_pct = bf_completed_by_id.get(bf_id)
completed_match_via = 'id'
if date_entry is None and completed_pct is None:
skipped += 1
if bf_id in bf_all_book_ids or bf_id in bf_all_ids:
skipped_no_date += 1
if full_skip_logs or skip_no_date_samples < _SKIP_LOG_SAMPLE_LIMIT:
title = self._book_title(cal_id)
self._log.debug(
f'SKIP {title!r} — found in BookFusion but has no parseable read date '
f'(bf_id={bf_id})'
)
skip_no_date_samples += 1
else:
skipped_not_in_api += 1
if full_skip_logs or skip_not_in_api_samples < _SKIP_LOG_SAMPLE_LIMIT:
title = self._book_title(cal_id)
self._log.debug(
f'SKIP {title!r} — not in BookFusion library (bf_id={bf_id})'
)
skip_not_in_api_samples += 1
continue
title = self._book_title(cal_id)
if date_entry is not None:
dt, source = date_entry
date_updates[cal_id] = dt
if date_match_via == 'book_id':
date_matched_by_book_id += 1
else:
date_matched_by_id += 1
self._emit_log(
f'OK {title}{dt.strftime("%Y-%m-%d")} '
f'(from {source}, match={date_match_via})'
)
if completed_pct is not None:
completed_updates[cal_id] = True
if completed_match_via == 'book_id':
completed_matched_by_book_id += 1
else:
completed_matched_by_id += 1
self._emit_log(
f'OK {title} → Completed=True '
f'(percentage={completed_pct:.3f}, threshold={completed_threshold:.2f}, '
f'match={completed_match_via})'
)
if self._stop:
self.log_message.emit('Sync cancelled.')
self._log.info('Sync cancelled by user')
return self.finished.emit(len(date_updates), skipped)
# 4. Write all updates to Calibre in one call
if date_updates:
self.status.emit(f'Writing {len(date_updates)} dates to Calibre…')
self._log.info(f'Writing {len(date_updates)} dates to column {column!r}')
self.db.set_field(column, date_updates)
self._emit_log(f'Applied {len(date_updates)} date updates to {column}')
if completed_column and completed_updates:
self.status.emit(f'Writing {len(completed_updates)} completed flags to Calibre…')
self._log.info(
f'Writing {len(completed_updates)} completed flags to column {completed_column!r}'
)
self.db.set_field(completed_column, completed_updates)
self._emit_log(
f'Applied {len(completed_updates)} completed flags to {completed_column}'
)
self._log.info(
'Match stats — '
f'date_updates={len(date_updates)}, '
f'date_match_book_id={date_matched_by_book_id}, date_match_id={date_matched_by_id}, '
f'completed_updates={len(completed_updates)}, '
f'completed_match_book_id={completed_matched_by_book_id}, '
f'completed_match_id={completed_matched_by_id}, '
f'skipped_no_date={skipped_no_date}, skipped_not_in_api={skipped_not_in_api}, '
f'logged_skip_lines={skip_no_date_samples + skip_not_in_api_samples}, '
f'full_skip_logs={full_skip_logs}'
)
self._log.info(f'Sync finished — {len(date_updates)} updated, {skipped} skipped')
self.progress.emit(total, total)
self.finished.emit(len(date_updates), skipped)
# ── HTTP with exponential back-off ───────────────────────────────────────
def _fetch_json(self, url, post_body=None, timeout=20):
"""GET or POST, returning parsed JSON.
Retries up to 3 times on network/timeout errors with delays 2 s → 4 s.
Raises urllib.error.HTTPError immediately (4xx/5xx are not transient).
"""
headers = _POST_HEADERS if post_body is not None else _GET_HEADERS
retry_delays = [2, 4]
last_exc = None
for attempt in range(3):
if self._stop:
raise RuntimeError('Sync stopped')
method = 'POST' if post_body is not None else 'GET'
self._log.debug(f'{method} {url} (attempt {attempt + 1}/3)')
try:
req = urllib.request.Request(url, data=post_body, headers=headers)
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = resp.read()
self._log.debug(f'{resp.status} {len(data)} bytes')
return json.loads(data)
except urllib.error.HTTPError:
raise # auth failures, 404, etc. — no retry
except (urllib.error.URLError, OSError, TimeoutError, http.client.IncompleteRead) as exc:
last_exc = exc
self._log.warning(f' Attempt {attempt + 1}/3 failed: {exc}')
if attempt < 2:
wait = retry_delays[attempt]
msg = f'Network error — retrying in {wait}s… ({exc})'
self.log_message.emit(msg)
self._log.info(f' Waiting {wait}s before retry')
time.sleep(wait)
raise RuntimeError(f'Failed after 3 attempts: {last_exc}')
# ── Auth ─────────────────────────────────────────────────────────────────
def _login(self, device, email, password):
body = json.dumps(
{'device': device, 'login': email, 'password': password}
).encode('utf-8')
data = self._fetch_json(f'{_API_BASE}/v1/auth.json', post_body=body)
return data['token']
# ── BookFusion library fetch ──────────────────────────────────────────────
def _fetch_all_books(self, device, token, per_page, timeout, expected_total):
all_books = []
page = 1
pages_fetched = 0
total_estimate = max(expected_total, 1)
self.fetch_progress.emit(0, total_estimate)
while not self._stop:
params = urllib.parse.urlencode({
'device': device,
'token': token,
'page': page,
'per_page': per_page,
})
page_data = self._fetch_json(
f'{_API_BASE}/v3/library/books.json?{params}',
timeout=timeout,
)
if not page_data:
self.fetch_progress.emit(total_estimate, total_estimate)
break
all_books.extend(page_data)
pages_fetched += 1
fetched = len(all_books)
if fetched > total_estimate:
total_estimate = fetched
if len(page_data) == per_page and fetched >= total_estimate:
total_estimate = fetched + per_page
self._log.debug(
f'Page {page}: {len(page_data)} books (running total: {len(all_books)})'
)
self.fetch_progress.emit(min(fetched, total_estimate), total_estimate)
self.status.emit(f'Fetching BookFusion library… {len(all_books)} books')
if len(page_data) < per_page:
self.fetch_progress.emit(total_estimate, total_estimate)
break
page += 1
return all_books
# ── Calibre helpers ──────────────────────────────────────────────────────
def _calibre_books_with_bf_id(self):
result = []
for cal_id in self.db.all_book_ids():
ids = self.db.field_for('identifiers', cal_id) or {}
bf_id = ids.get('bookfusion')
if bf_id:
result.append((cal_id, str(bf_id)))
return result
def _book_title(self, cal_id):
return self.db.field_for('title', cal_id) or f'Book #{cal_id}'
# ── Helpers ───────────────────────────────────────────────────────────────
def _emit_log(self, msg, level='info'):
self.log_message.emit(msg)
getattr(self._log, level)(msg)
# ── Utility ──────────────────────────────────────────────────────────────────
def _parse_iso(s):
"""Parse ISO 8601 string (Z or +00:00 suffix) into a tz-aware datetime."""
try:
return datetime.fromisoformat(s.replace('Z', '+00:00'))
except (ValueError, AttributeError):
return None
def _id_key(value):
if value is None:
return ''
return str(value)
def _as_float(value, default=None):
try:
if value is None:
return default
return float(value)
except (TypeError, ValueError):
return default
def _as_int(value, default, min_value=None, max_value=None):
try:
if value is None:
return default
result = int(value)
except (TypeError, ValueError):
return default
if min_value is not None and result < min_value:
return min_value
if max_value is not None and result > max_value:
return max_value
return result