Source code for ewoksserver.app.models
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional
import logging
from pydantic import Field, field_validator
from pydantic import BaseModel
logger = logging.getLogger(__name__)
[docs]class EwoksSchedulingType(str, Enum):
Local = "local"
Celery = "celery"
[docs]class EwoksDiscoverySettings(BaseModel):
on_start_up: bool = Field(default=False, title="Discover ewoks tasks on startup")
timeout: Optional[float] = Field(
default=None, title="Timeout for task discovery (in seconds)"
)
[docs]class EwoksExecutionSettings(BaseModel):
handlers: List[Dict] = Field(default=list(), title="Ewoks execution handlers")
[docs]class EwoksJobSettings(BaseModel):
type: EwoksSchedulingType = EwoksSchedulingType.Local
configuration: dict = dict()
[docs]class EwoksSettings(BaseModel):
configured: bool = Field(
default=False, title="Config or resource directory have been defined"
)
resource_directory: Path = Field(
default=Path("."), title="Backend file resource directory"
)
without_events: bool = Field(default=False, title="Enable ewoks events")
ewoks_discovery: EwoksDiscoverySettings = Field(
default=None, title="Ewoks discovery settings", validate_default=True
)
ewoks_execution: EwoksExecutionSettings = Field(
default=None, title="Ewoks execution settings", validate_default=True
)
ewoks_scheduling: EwoksJobSettings = Field(
default=None, title="Ewoks job scheduling settings", validate_default=True
)
[docs] @field_validator(
"ewoks_discovery", "ewoks_execution", "ewoks_scheduling", mode="before"
)
@classmethod
def set_default_value(cls, input_value):
if input_value is None:
return dict()
return input_value
[docs]class AppSettings(BaseModel):
no_older_versions: bool = Field(
default=False, title="Do not create end points for older API versions"
)