Using the Task Manager

When developing plugins for Bedrock Server Manager, you may encounter situations where a task takes a long time to complete (e.g., downloading large files, processing backups, making complex external API calls).

Running these tasks directly within an event hook or a FastAPI endpoint blocks the main application thread or event loop. This can cause the web interface to become unresponsive and potentially lead to WebSocket disconnections.

To solve this, Bedrock Server Manager provides a TaskManager to offload long-running operations to background threads.

Accessing the Task Manager

You can access the TaskManager instance via your plugin’s api object. The task manager is attached to the main application context:

task_manager = self.api.app_context.task_manager()

Running Background Tasks

The primary method you will use is run_task. This method submits a function to be executed in the background and immediately returns a unique task_id.

Example: A Long-Running Task

Here is an example of a plugin that provides a web endpoint to trigger a long-running background task.

import time
import logging
from fastapi import APIRouter, Depends
from bedrock_server_manager import PluginBase

# It is recommended to import the application context dependency
# to cleanly access the task manager from decoupled FastAPI routers.
from bedrock_server_manager.web.dependencies import get_app_context
from bedrock_server_manager.web.auth_utils import get_current_user
from bedrock_server_manager.context import AppContext
from bedrock_server_manager.web.schemas import User

logger = logging.getLogger(__name__)

plugin_web_router = APIRouter(
    prefix="/my_task_plugin",
    tags=["My Task Plugin"]
)

def my_long_running_function(seconds: int, name: str):
    """This function runs in the background."""
    logger.info(f"Starting long task for {name}...")
    # Simulate a long-running process
    time.sleep(seconds)
    logger.info(f"Finished long task for {name}!")
    return f"Processed {name} successfully in {seconds} seconds."

@plugin_web_router.post("/start_task")
async def start_task(
    seconds: int = 5,
    name: str = "example",
    app_context: AppContext = Depends(get_app_context),
    current_user: User = Depends(get_current_user)
):
    """Starts a task using the injected AppContext and User."""
    task_manager = app_context.task_manager()

    task_id = task_manager.run_task(
        my_long_running_function,
        username=current_user.username, # Provide the username for targeted WebSocket UI updates
        seconds=seconds,
        name=name
    )

    return {"status": "success", "task_id": task_id, "message": "Task started in the background."}

class MyTaskPlugin(PluginBase):
    version = "1.0.0"

    def on_load(self):
        self.logger.info("MyTaskPlugin loaded.")

        # We can attach the task manager or API to the router's state if needed
        # Or, we can define the endpoint within the class methods if we want direct `self` access

    def get_fastapi_routers(self):
        # A simpler way to get access to `self.api` is to define the route dynamically here

        router = APIRouter(prefix="/my_task_plugin", tags=["My Task Plugin"])

        @router.post("/start_task")
        async def trigger_task(
            seconds: int = 5,
            name: str = "example",
            current_user: User = Depends(get_current_user)
        ):
            task_manager = self.api.app_context.task_manager()

            # Submit the function to the background.
            # *args and **kwargs passed to run_task will be forwarded to your function.
            task_id = task_manager.run_task(
                my_long_running_function,
                username=current_user.username, # Provide the username for targeted WebSocket UI updates
                seconds=seconds,  # kwarg for my_long_running_function
                name=name         # kwarg for my_long_running_function
            )

            return {"status": "success", "task_id": task_id, "message": "Task started in the background."}

        @router.get("/task_status/{task_id}")
        async def get_task_status(task_id: str):
            task_manager = self.api.app_context.task_manager()
            task_details = task_manager.get_task(task_id)

            if not task_details:
                return {"status": "error", "message": "Task not found."}

            return {"status": "success", "task": task_details}

        return [router]

Tracking Task Status and UI Updates

When you submit a task using run_task, the TaskManager automatically tracks its status (in_progress, success, error).

The task manager returns a dictionary containing the task details:

{
    "status": "in_progress", // Or "success", "error"
    "message": "Task is running.", // Or the error/success message
    "result": null, // Will contain the return value of your function upon success
    "username": "admin"
}

WebSocket Notifications

If you provide the username argument when calling run_task, the TaskManager will automatically send WebSocket notifications to that specific user whenever the task status updates (e.g., when it completes or fails).

The frontend can listen for these notifications on the task:{task_id} topic to update the UI without needing to poll the task_status endpoint repeatedly.

Handling Exceptions

If your background function raises an exception, the task manager will catch it, log the error using the main application logger, and update the task’s status to error. The exception message will be stored in the task’s message field.