(WIP) feat: Task Scheduler Management UI/UX Part 2

This commit is contained in:
Rafael Uzarowski 2025-04-06 20:57:07 +02:00
parent 82eebf730e
commit 282cbf90dc
No known key found for this signature in database
GPG key ID: DEDFAEC7F474C685
5 changed files with 161 additions and 55 deletions

View file

@ -1,14 +1,16 @@
from python.helpers.api import ApiHandler, Input, Output, Request
from python.helpers.task_scheduler import TaskScheduler, TaskState
from python.helpers.print_style import PrintStyle
class SchedulerTaskRun(ApiHandler):
_printer: PrintStyle = PrintStyle(italic=True, font_color="green", padding=False)
async def process(self, input: Input, request: Request) -> Output:
"""
Manually run a task from the scheduler by ID
"""
scheduler = TaskScheduler.get()
await scheduler.reload()
# Get task ID from input
task_id: str = input.get("task_id", "")
@ -16,15 +18,22 @@ class SchedulerTaskRun(ApiHandler):
if not task_id:
return {"error": "Missing required field: task_id"}
self._printer.print(f"SchedulerTaskRun: On-Demand running task {task_id}")
scheduler = TaskScheduler.get()
await scheduler.reload()
# Check if the task exists first
task = scheduler.get_task_by_uuid(task_id)
if not task:
self._printer.error(f"SchedulerTaskRun: Task with ID '{task_id}' not found")
return {"error": f"Task with ID '{task_id}' not found"}
# Check if task is already running
if task.state != TaskState.IDLE:
if task.state == TaskState.RUNNING:
# Return task details along with error for better frontend handling
serialized_task = scheduler.serialize_task(task_id)
self._printer.error(f"SchedulerTaskRun: Task '{task_id}' is in state '{task.state}' and cannot be run")
return {
"error": f"Task '{task_id}' is in state '{task.state}' and cannot be run",
"task": serialized_task
@ -33,6 +42,7 @@ class SchedulerTaskRun(ApiHandler):
# Run the task, which now includes atomic state checks and updates
try:
await scheduler.run_task_by_uuid(task_id)
self._printer.print(f"SchedulerTaskRun: Task '{task_id}' started successfully")
# Get updated task after run starts
serialized_task = scheduler.serialize_task(task_id)
if serialized_task:
@ -44,6 +54,8 @@ class SchedulerTaskRun(ApiHandler):
else:
return {"success": True, "message": f"Task '{task_id}' started successfully"}
except ValueError as e:
self._printer.error(f"SchedulerTaskRun: Task '{task_id}' failed to start: {str(e)}")
return {"error": str(e)}
except Exception as e:
self._printer.error(f"SchedulerTaskRun: Task '{task_id}' failed to start: {str(e)}")
return {"error": f"Failed to run task '{task_id}': {str(e)}"}

View file

@ -1,12 +1,15 @@
import asyncio
from datetime import datetime, timezone, timedelta
import json
import os
import random
import threading
from urllib.parse import urlparse
import uuid
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass
from enum import Enum
from os.path import exists
from typing import ClassVar, Literal, Optional, Union, Dict, Any, Type, TypeVar, cast
from typing import Any, Callable, Coroutine, Dict, Literal, Optional, Type, TypeVar, Union, cast, ClassVar
import nest_asyncio
nest_asyncio.apply()
@ -14,12 +17,14 @@ nest_asyncio.apply()
from crontab import CronTab
from pydantic import BaseModel, Field, PrivateAttr
from agent import Agent, AgentContext, UserMessage
from agent import Agent, AgentConfig, AgentContext, UserMessage
from initialize import initialize
from python.helpers.persist_chat import load_tmp_chats, save_tmp_chat
from python.helpers.print_style import PrintStyle
from python.helpers.defer import DeferredTask
from python.helpers.files import make_dirs, write_file, get_abs_path, read_file
from python.helpers.files import get_abs_path, list_files, make_dirs, read_file, write_file
from python.helpers.persist_chat import load_tmp_chats, save_tmp_chat
from python.helpers.print_style import PrintStyle
SCHEDULER_FOLDER = "memory/scheduler"
@ -115,7 +120,7 @@ class AdHocTask(BaseModel):
self.token = token
self.updated_at = datetime.now(timezone.utc)
def check_schedule(self) -> bool:
def check_schedule(self, frequency_seconds: float = 60.0) -> bool:
with self._lock:
return False
@ -287,9 +292,10 @@ class SchedulerTaskList(BaseModel):
with self._lock:
return self.tasks
def get_due_tasks(self) -> list[Union[ScheduledTask, AdHocTask]]:
async def get_due_tasks(self) -> list[Union[ScheduledTask, AdHocTask]]:
with self._lock:
return [task for task in self.tasks if task.check_schedule()]
await self.reload()
return [task for task in self.tasks if task.check_schedule() and task.state == TaskState.IDLE]
def get_task_by_uuid(self, task_uuid: str) -> Union[ScheduledTask, AdHocTask] | None:
with self._lock:
@ -356,7 +362,7 @@ class TaskScheduler:
return self._tasks.get_task_by_name(name)
async def tick(self):
for task in self._tasks.get_due_tasks():
for task in await self._tasks.get_due_tasks():
await self._run_task(task)
async def run_task_by_uuid(self, task_uuid: str):
@ -432,15 +438,9 @@ class TaskScheduler:
if task_snapshot is None:
self._printer.print(f"Scheduler Task with UUID '{task_uuid}' not found")
return
if not isinstance(task_snapshot, ScheduledTask):
self._printer.error(f"Scheduler Task '{task_snapshot.name}' is not an ScheduledTask, this should not happen, skipping")
return
if task_snapshot.state == TaskState.RUNNING:
self._printer.print(f"Scheduler Task '{task_snapshot.name}' already running, skipping")
return
if task_snapshot.state != TaskState.IDLE:
self._printer.print(f"Scheduler Task '{task_snapshot.name}' state is '{task_snapshot.state}', skipping")
return
# Atomically fetch and check the task's current state
current_task = await self.update_task(task_uuid, state=TaskState.RUNNING)
@ -456,6 +456,11 @@ class TaskScheduler:
self._printer.print(f"Scheduler Task '{current_task.name}' started")
context = await self._get_chat_context(current_task)
# Ensure the context is properly registered in the AgentContext._contexts
# This is critical for the polling mechanism to find and stream logs
AgentContext._contexts[context.id] = context
agent = Agent(0, context.config, context)
# Prepare attachment filenames for logging
@ -463,7 +468,16 @@ class TaskScheduler:
if current_task.attachments:
for attachment in current_task.attachments:
if os.path.exists(attachment):
attachment_filenames.append(os.path.basename(attachment))
attachment_filenames.append(attachment)
else:
try:
url = urlparse(attachment)
if url.scheme in ["http", "https", "ftp", "ftps", "sftp"]:
attachment_filenames.append(attachment)
else:
self._printer.print(f"Skipping attachment: [{attachment}]")
except Exception:
self._printer.print(f"Skipping attachment: [{attachment}]")
self._printer.print("User message:")
self._printer.print(f"> {current_task.prompt}")
@ -485,8 +499,10 @@ class TaskScheduler:
UserMessage(
message=current_task.prompt,
system_message=[current_task.system_prompt],
attachments=[]))
attachments=attachment_filenames))
# Persist after setting up the context but before running the agent
# This ensures the task context is saved and can be found by polling
await self._persist_chat(current_task, context)
result = await agent.monologue()
@ -578,7 +594,7 @@ def parse_task_schedule(schedule_data: Dict[str, str]) -> TaskSchedule:
weekday=schedule_data.get('weekday', '*')
)
except Exception as e:
raise ValueError(f"Invalid schedule format: {e}")
raise ValueError(f"Invalid schedule format: {e}") from e
T = TypeVar('T', bound=Union[ScheduledTask, AdHocTask])

View file

@ -625,8 +625,7 @@
</nav>
<div id="section-task-scheduler" class="section"
x-data="schedulerSettings"
x-init="$watch('activeTab', (val) => { if(val === 'scheduler') { fetchTasks(); } })">
x-data="schedulerSettings">
<div class="section-title">Task Scheduler</div>
<div class="section-description">Manage scheduled tasks and automated processes for Agent Zero.</div>

View file

@ -5,8 +5,6 @@
// Add a document ready event handler to ensure the scheduler tab can be clicked on first load
document.addEventListener('DOMContentLoaded', function() {
console.log('DOMContentLoaded: Setting up scheduler tab click handler');
// Setup scheduler tab click handling
const setupSchedulerTab = () => {
const settingsModal = document.getElementById('settingsModal');
@ -15,15 +13,12 @@ document.addEventListener('DOMContentLoaded', function() {
return;
}
console.log('Setting up click interceptor for scheduler tab');
// Create a global event listener for clicks on the scheduler tab
document.addEventListener('click', function(e) {
// Find if the click was on the scheduler tab or its children
const schedulerTab = e.target.closest('.settings-tab[title="Task Scheduler"]');
if (!schedulerTab) return;
console.log('Intercepted click on scheduler tab');
e.preventDefault();
e.stopPropagation();
@ -31,7 +26,6 @@ document.addEventListener('DOMContentLoaded', function() {
try {
const modalData = Alpine.$data(settingsModal);
if (modalData.activeTab !== 'scheduler') {
console.log(`Directly switching to scheduler tab via click interceptor.`);
// Directly call the modal's switchTab method
modalData.switchTab('scheduler');
}
@ -57,6 +51,7 @@ document.addEventListener('alpine:init', () => {
filterType: 'all', // all, scheduled, adhoc
filterState: 'all', // all, idle, running, disabled, error
pollingInterval: null,
pollingActive: false, // Track if polling is currently active
editingTask: null,
isCreating: false,
isEditing: false,
@ -108,32 +103,56 @@ document.addEventListener('alpine:init', () => {
// Use a small delay to ensure Alpine.js has fully initialized
// before fetching tasks, which helps prevent layout shift
setTimeout(() => {
// Initial fetch to populate the list
this.fetchTasks();
}, 50);
// Set up polling when component is active
this.$watch('$store.root.activeTab', (newTab, oldTab) => {
if (newTab === 'scheduler') {
// Only start polling if the modal is actually open
const store = Alpine.store('root');
if (store && store.isOpen === true) {
this.startPolling();
} else if (oldTab === 'scheduler') {
this.stopPolling();
}
});
}, 100);
// Initial polling if tab is active on load
if (this.$store.root.activeTab === 'scheduler') {
this.startPolling();
}
// Initialize safe watchers with defensive checks
this.$nextTick(() => {
// Watch the modal state from the root store
this.$watch('$store.root.isOpen', (isOpen) => {
// Only proceed if the value is not undefined
if (typeof isOpen !== 'undefined') {
if (isOpen === true) {
// Modal just opened
this.startPolling();
} else if (isOpen === false) {
// Modal closed, stop polling
this.stopPolling();
}
}
});
});
},
// Start polling for task updates
startPolling() {
// Don't start if already polling
if (this.pollingInterval) {
return;
}
this.pollingActive = true;
// Fetch immediately, then set up interval for every 2 seconds
this.fetchTasks();
this.pollingInterval = setInterval(() => this.fetchTasks(), 5000); // Poll every 5 seconds
this.pollingInterval = setInterval(() => {
if (this.pollingActive) {
this.fetchTasks();
}
}, 2000); // Poll every 2 seconds as requested
},
// Stop polling when tab is inactive
stopPolling() {
this.pollingActive = false;
if (this.pollingInterval) {
clearInterval(this.pollingInterval);
this.pollingInterval = null;
@ -142,6 +161,16 @@ document.addEventListener('alpine:init', () => {
// Fetch tasks from API
async fetchTasks() {
// Don't fetch if polling is inactive (prevents race conditions)
if (!this.pollingActive && this.pollingInterval) {
return;
}
// Don't fetch while creating/editing a task
if (this.isCreating || this.isEditing) {
return;
}
this.isLoading = true;
try {
const response = await fetch('/scheduler_tasks_list', {
@ -160,7 +189,10 @@ document.addEventListener('alpine:init', () => {
this.tasks = data.tasks || [];
} catch (error) {
console.error('Error fetching tasks:', error);
showToast('Failed to fetch tasks: ' + error.message, 'error');
// Only show toast for errors on manual refresh, not during polling
if (!this.pollingInterval) {
showToast('Failed to fetch tasks: ' + error.message, 'error');
}
} finally {
this.isLoading = false;
}

View file

@ -19,24 +19,23 @@ const settingsModalProxy = {
// Switch tab method
switchTab(tabName) {
console.log(`Switching tab from ${this.activeTab} to ${tabName}`);
// Update our component state
this.activeTab = tabName;
// Update the store safely
const store = Alpine.store('root');
if (store) {
store.activeTab = tabName;
}
localStorage.setItem('settingsActiveTab', tabName);
// Auto-scroll active tab into view after a short delay to ensure DOM updates
setTimeout(() => {
const activeTab = document.querySelector('.settings-tab.active');
if (activeTab) {
console.log(`Scrolling active tab into view: ${activeTab.textContent}`);
activeTab.scrollIntoView({ behavior: 'smooth', block: 'nearest', inline: 'center' });
} else {
console.warn('No active tab found to scroll into view');
}
// Debug the scheduler tab specifically
const schedulerTab = document.querySelector('.settings-tab[title="Task Scheduler"]');
console.log('Scheduler tab:', schedulerTab);
console.log('Scheduler tab active?', schedulerTab && schedulerTab.classList.contains('active'));
}, 10);
},
@ -45,6 +44,13 @@ const settingsModalProxy = {
const modalEl = document.getElementById('settingsModal');
const modalAD = Alpine.$data(modalEl);
// First, ensure the store is updated properly
const store = Alpine.store('root');
if (store) {
// Set isOpen first to ensure proper state
store.isOpen = true;
}
//get settings from backend
try {
const set = await sendJsonData("/settings_get", null);
@ -81,6 +87,12 @@ const settingsModalProxy = {
// Directly set the active tab
modalAD.activeTab = savedTab;
// Also update the store
if (store) {
store.activeTab = savedTab;
}
localStorage.setItem('settingsActiveTab', savedTab);
// Add a small delay *after* setting the tab to ensure scrolling works
@ -157,7 +169,18 @@ const settingsModalProxy = {
} else if (buttonId === 'cancel') {
this.handleCancel();
}
// First update our component state
this.isOpen = false;
// Then safely update the store
const store = Alpine.store('root');
if (store) {
// Use a slight delay to avoid reactivity issues
setTimeout(() => {
store.isOpen = false;
}, 10);
}
},
async handleCancel() {
@ -165,7 +188,18 @@ const settingsModalProxy = {
status: 'cancelled',
data: null
});
// First update our component state
this.isOpen = false;
// Then safely update the store
const store = Alpine.store('root');
if (store) {
// Use a slight delay to avoid reactivity issues
setTimeout(() => {
store.isOpen = false;
}, 10);
}
},
handleFieldButton(field) {
@ -191,6 +225,7 @@ const settingsModalProxy = {
// });
document.addEventListener('alpine:init', function () {
// Initialize the root store first to ensure it exists before components try to access it
Alpine.store('root', {
activeTab: localStorage.getItem('settingsActiveTab') || 'agent',
isOpen: false,
@ -200,6 +235,7 @@ document.addEventListener('alpine:init', function () {
}
});
// Then initialize other Alpine components
Alpine.data('settingsModal', function () {
return {
settingsData: {},
@ -208,21 +244,32 @@ document.addEventListener('alpine:init', function () {
isLoading: true,
async init() {
// Initialize with the store value
this.activeTab = Alpine.store('root').activeTab || 'agent';
// Watch store tab changes
this.$watch('$store.root.activeTab', (newTab) => {
this.activeTab = newTab;
localStorage.setItem('settingsActiveTab', newTab);
this.updateFilteredSections();
if (typeof newTab !== 'undefined') {
this.activeTab = newTab;
localStorage.setItem('settingsActiveTab', newTab);
this.updateFilteredSections();
}
});
// Load settings
await this.fetchSettings();
this.activeTab = this.$store.root.activeTab;
this.updateFilteredSections();
},
switchTab(tab) {
this.$store.root.activeTab = tab;
// Update our component state
this.activeTab = tab;
// Update the store safely
const store = Alpine.store('root');
if (store) {
store.activeTab = tab;
}
},
async fetchSettings() {