Add configurable indexing batch size and delay; enhance index creation settings

This commit is contained in:
Dmitriy Kazimirov 2025-04-06 13:30:02 +00:00
parent d7afc2907f
commit 2eb04397e5
3 changed files with 76 additions and 17 deletions

View file

@ -69,4 +69,10 @@ ADMIN_PASSWORD=securepassword123
ELASTICSEARCH_USERNAME = admin
# Elastic password
ELASTICSEARCH_PASSWORD = password
ELASTICSEARCH_PASSWORD = password
# Indexing batch size (smaller batches use less memory)
INDEXING_BATCH_SIZE=100
# Delay between batches in milliseconds (helps prevent memory spikes)
INDEXING_BATCH_DELAY=500

View file

@ -13,6 +13,8 @@ services:
- ADMIN_PASSWORD=${ADMIN_PASSWORD}
- SNIPPET_CHAR_LIMIT=${SNIPPET_CHAR_LIMIT}
- ITEMS_PER_PAGE=${ITEMS_PER_PAGE}
- INDEXING_BATCH_SIZE=${INDEXING_BATCH_SIZE}
- INDEXING_BATCH_DELAY=${INDEXING_BATCH_DELAY}
volumes:
- ${SMB_SHARE_PATH}:/books
depends_on:
@ -22,7 +24,8 @@ services:
resources:
limits:
cpus: ${CPU_LIMIT}
memory: 2G
memory: 4G
booksearch_elastic:
container_name: booksearch_elastic
@ -35,10 +38,34 @@ services:
- ELASTICSEARCH_USERNAME=${ELASTICSEARCH_USERNAME}
- ELASTICSEARCH_PASSWORD=${ELASTICSEARCH_PASSWORD}
- ELASTICSEARCH_PLUGINS=analysis-stempel
- ES_JAVA_OPTS=-Xms6g -Xmx6g
- bootstrap.memory_lock=true
restart: unless-stopped
mem_limit: 8g
ulimits:
memlock:
soft: -1
hard: -1
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9200/_nodes/plugins?filter_path=nodes.*.plugins"]
interval: 30s
timeout: 10s
retries: 5
retries: 5
# Monitoring service
# booksearch_monitor:
# image: cadvisor/cadvisor:latest
# privileged: true
# devices:
# - /dev/kmsg:/dev/kmsg
# volumes:
# - /:/rootfs:ro
# - /var/run:/var/run:ro
# - /sys:/sys:ro
# - /var/lib/docker/:/var/lib/docker:ro
# - /dev/disk/:/dev/disk:ro
# - /cgroup:/cgroup:ro
# ports:
# - "8085:8080"
# restart: unless-stopped

View file

@ -104,7 +104,17 @@ progress_lock = Lock()
def create_index():
if not es.indices.exists(index=INDEX_NAME):
es.indices.create(index=INDEX_NAME)
es.indices.create(
index=INDEX_NAME,
body={
"settings": {
"index.refresh_interval": "5s",
"index.number_of_replicas": 0,
"index.translog.durability": "async",
"index.mapping.total_fields.limit": 10000
}
}
)
# TODO: remove old version?
def extract_text_from_epub_old(epub_path):
@ -400,12 +410,15 @@ def index_single_file(file_path):
return {"status": "error", "file_path": file_path, "error": str(e)}
def index_files(directory):
"""Index files using parallel processing"""
"""Index files using parallel processing with configurable batch size"""
global indexing_progress
import multiprocessing
# Get CPU configuration from environment or use sensible default
# Get configuration from environment
cpu_limit = os.environ.get("CPU_LIMIT")
batch_size = int(os.environ.get("INDEXING_BATCH_SIZE", "100")) # Default 100 files per batch
batch_delay = float(os.environ.get("INDEXING_BATCH_DELAY", "0.5")) # Default 0.5s delay
available_cpus = multiprocessing.cpu_count()
used_cpus = int(float(cpu_limit)) if cpu_limit else max(1, available_cpus - 1)
@ -417,7 +430,9 @@ def index_files(directory):
'is_running': True,
'current_file': '',
'errors': [],
'cpu_count': used_cpus
'cpu_count': used_cpus,
'batch_size': batch_size,
'batch_delay': batch_delay
}
try:
@ -433,18 +448,29 @@ def index_files(directory):
with progress_lock:
indexing_progress['total_files'] = len(files_to_index)
# Process files in parallel using a process pool
# Use chunksize to optimize for many small files
chunksize = max(1, len(files_to_index) // (used_cpus * 10))
# Process files in batches to control memory usage
success_count = error_count = skipped_count = 0
chunksize = max(1, batch_size // used_cpus)
with multiprocessing.Pool(processes=used_cpus) as pool:
# Map the index_single_file function to all files
results = pool.map(index_single_file, files_to_index, chunksize=chunksize)
# Process results if needed
error_count = sum(1 for r in results if r.get('status') == 'error')
success_count = sum(1 for r in results if r.get('status') == 'success')
skipped_count = sum(1 for r in results if r.get('status') == 'skipped')
for i in range(0, len(files_to_index), batch_size):
batch = files_to_index[i:i + batch_size]
# Process current batch
results = pool.map(index_single_file, batch, chunksize=chunksize)
# Update counts
error_count += sum(1 for r in results if r.get('status') == 'error')
success_count += sum(1 for r in results if r.get('status') == 'success')
skipped_count += sum(1 for r in results if r.get('status') == 'skipped')
# Update progress
with progress_lock:
indexing_progress['processed_files'] = i + len(batch)
# Add delay between batches if configured
if batch_delay > 0 and i + batch_size < len(files_to_index):
time.sleep(batch_delay)
print(f"Indexing complete: {success_count} succeeded, {error_count} failed, {skipped_count} skipped")