Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 84 additions & 35 deletions pyaml/common/element.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from pydantic import BaseModel, ConfigDict

Expand All @@ -12,21 +12,45 @@ def __pyaml_repr__(obj):
"""
Returns a string representation of a pyaml object
"""
if hasattr(obj, "_cfg"):

cls_name = obj.__class__.__name__

# Keep the old behavior when _cfg exists
cfg = getattr(obj, "_cfg", None)
if cfg is not None:
if isinstance(obj, Element):
return repr(obj._cfg).replace(
return repr(cfg).replace(
"ConfigModel(",
obj.__class__.__name__ + "(peer='" + obj.attached_to() + "', ",
f"{cls_name}(peer={obj.attached_to()!r}, ",
1,
)
else:
# no peer
return repr(obj._cfg).replace("ConfigModel", obj.__class__.__name__)
else:
# Object is not yet fully constructed
if isinstance(obj, Element):
return f"{obj.__class__.__name__}: {obj.get_name()}"
else:
return f"{obj.__class__.__name__}"
return repr(cfg).replace("ConfigModel", cls_name, 1)

# Generic fallback when there is no _cfg
attrs = {}

# Instance attributes
for k, v in obj.__dict__.items():
# Exclude private attributes
if not k.startswith("_"):
attrs[k] = v

# Properties
for name, attr in vars(type(obj)).items():
if isinstance(attr, property):
try:
attrs[name] = getattr(obj, name)
except Exception as e:
attrs[name] = f"<error: {e}>"

if isinstance(obj, Element) and "name" not in attrs:
try:
attrs["name"] = obj.get_name()
except Exception as e:
attrs["name"] = f"<error: {e}>"

parts = ", ".join(f"{k}={v!r}" for k, v in attrs.items())
return f"{cls_name}({parts})" if parts else cls_name


class ElementConfigModel(BaseModel):
Expand Down Expand Up @@ -57,39 +81,64 @@ class ElementConfigModel(BaseModel):
lattice_names: str | None = None


class Element(object):
class Element:
"""
Class providing access to one element of a physical or simulated lattice

Attributes:
name: str
The unique name identifying the element in the configuration file
"""

def __init__(self, name: str):
self._name: str = name
self._peer: "ElementHolder" = None # Peer: ControlSystem, Simulator
def __init__(
self,
name: str,
lattice_names: str | None = None,
description: str | None = None,
):
self._name = name
self._lattice_names = lattice_names
self._description = description
self._peer: ElementHolder | None = None

def get_name(self) -> str:
def _cfg_value(self, attr: str, fallback: Any) -> Any:
"""
Returns the name of the element
Return an attribute from _cfg if available, otherwise fallback.
"""
return self._name
cfg = getattr(self, "_cfg", None)
if cfg is not None:
value = getattr(cfg, attr, None)
if value is not None:
return value
return fallback

def get_lattice_names(self) -> str:
"""
Returns the name of associated lattice element(s)
"""
if not hasattr(self, "_cfg"):
return self._name
else:
return self._cfg.lattice_names
@property
def name(self) -> str:
return self._cfg_value("name", self._name)

@property
def lattice_names(self) -> str:
cfg = getattr(self, "_cfg", None)

if cfg is not None and cfg.lattice_names is not None:
return cfg.lattice_names

if self._lattice_names is not None:
return self._lattice_names

return self.name

def get_description(self) -> str:
@property
def description(self) -> str | None:
return self._cfg_value("description", self._description)

def get_name(self) -> str:
"""
Returns the description of the element
Returns the name of the element
"""
return self._cfg.description
return self.name

def get_lattice_names(self) -> str | None:
return self.lattice_names

def get_description(self) -> str | None:
return self.description

def set_energy(self, E: float):
"""
Expand Down
2 changes: 1 addition & 1 deletion pyaml/common/element_holder.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def get_rf_plant(self, name: str) -> RFPlant:
def add_rf_plant(self, rf: RFPlant):
self.__add(self.__RFPLANT, rf)

def add_rf_transnmitter(self, rf: RFTransmitter):
def add_rf_transmitter(self, rf: RFTransmitter):
self.__add(self.__RFTRANSMITTER, rf)

def get_rf_trasnmitter(self, name: str) -> RFTransmitter:
Expand Down
14 changes: 8 additions & 6 deletions pyaml/configuration/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class BuildInfo:
----------
module : ModuleType
Imported module containing the object class and validation model.
config_cls : type[BaseModel]
config_cls : type[BaseModel], optional
Pydantic model used to validate the configuration.
class_str : str
Name of the class to instantiate.
Expand All @@ -174,7 +174,7 @@ class BuildInfo:
"""

module: ModuleType
config_cls: type[BaseModel]
config_cls: type[BaseModel] | None
class_str: str
field_locations: dict | None
location_str: str
Expand Down Expand Up @@ -248,8 +248,6 @@ def resolve_build_info(data: dict, ignore_external: bool) -> BuildInfo | None:

# Get the validation class
config_cls = getattr(module, validation_class_str, None)
if config_cls is None:
raise PyAMLConfigException(f"No validation class for '{module.__name__}.{class_str}' {location_str}")

return BuildInfo(
module=module,
Expand Down Expand Up @@ -456,6 +454,8 @@ def _construct_element(

try:
if control_modes is None:
if isinstance(cfg, dict):
return elem_cls(**cfg)
return elem_cls(cfg)

return UnboundElement(elem_cls, module_name, control_modes, cfg)
Expand Down Expand Up @@ -495,9 +495,11 @@ def build_object(self, data: dict, ignore_external: bool = False):

cleaned_data, control_modes = self._strip_build_metadata(data)

# Validate the model
try:
cfg = config_cls.model_validate(cleaned_data)
if config_cls is not None:
cfg = config_cls.model_validate(cleaned_data)
else:
cfg = cleaned_data
except ValidationError as e:
handle_validation_error(e, module.__name__, location_str, field_locations)

Expand Down
14 changes: 7 additions & 7 deletions pyaml/control/controlsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,19 @@ def fill_device(self, elements: list[Element]):

elif isinstance(e, RFPlant):
attachedTrans: list[RFTransmitter] = []
if e._cfg.transmitters:
for t in e._cfg.transmitters:
vDev = self.get_device_access(t._cfg.voltage)
pDev = self.get_device_access(t._cfg.phase)
if e.transmitters:
for t in e.transmitters:
vDev = self.get_device_access(t.voltage_name)
pDev = self.get_device_access(t.phase_name)
voltage = RWRFVoltageScalar(t, vDev)
phase = RWRFPhaseScalar(t, pDev)
nt = t.attach(self, voltage, phase)
self.add_rf_transnmitter(nt)
self.add_rf_transmitter(nt)
attachedTrans.append(nt)

fDev = self.get_device_access(e._cfg.masterclock)
fDev = self.get_device_access(e.masterclock)
frequency = RWRFFrequencyScalar(e, fDev)
voltage = RWTotalVoltage(attachedTrans) if e._cfg.transmitters else None
voltage = RWTotalVoltage(attachedTrans) if e.transmitters else None
ne = e.attach(self, frequency, voltage)
self.add_rf_plant(ne)

Expand Down
10 changes: 5 additions & 5 deletions pyaml/lattice/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,25 +200,25 @@ def fill_device(self, elements: list[Element]):
self.add_bpm(e)

elif isinstance(e, RFPlant):
if e._cfg.transmitters:
if e.transmitters:
cavs: list[at.Element] = []
harmonics: list[float] = []
attachedTrans: list[RFTransmitter] = []
for t in e._cfg.transmitters:
for t in e.transmitters:
cavsPerTrans: list[at.Element] = []
for c in t._cfg.cavities:
for c in t.cavities:
# Expect unique name for cavities
cav = self.get_at_elems(Element(c))
if len(cav) > 1:
raise PyAMLException(f"RF transmitter {t.get_name()},multiple cavity definition:{{cav[0]}}")
if len(cav) == 0:
raise PyAMLException(f"RF transmitter {t.get_name()}, No cavity found")
cavsPerTrans.append(cav[0])
harmonics.append(t._cfg.harmonic)
harmonics.append(t.harmonic)
voltage = RWRFVoltageScalar(cavsPerTrans)
phase = RWRFPhaseScalar(cavsPerTrans)
nt = t.attach(self, voltage, phase)
self.add_rf_transnmitter(nt)
self.add_rf_transmitter(nt)
cavs.extend(cavsPerTrans)
attachedTrans.append(nt)

Expand Down
51 changes: 24 additions & 27 deletions pyaml/rf/rf_plant.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,46 @@
import numpy as np
from pydantic import BaseModel, ConfigDict

try:
from typing import Self # Python 3.11+
except ImportError:
from typing_extensions import Self # Python 3.10 and earlier
import copy
from typing import Self

from .. import PyAMLException
from ..common import abstract
from ..common.element import Element, ElementConfigModel
from ..control.deviceaccess import DeviceAccess
from ..common.element import Element
from ..validation import DynamicValidation
from .rf_transmitter import RFTransmitter

# Define the main class name for this module
PYAMLCLASS = "RFPlant"


class ConfigModel(ElementConfigModel):
masterclock: str | None = None
"""Device to apply main RF frequency"""
transmitters: list[RFTransmitter] | None = None
"""List of RF trasnmitters"""


class RFPlant(Element):
class RFPlant(Element, DynamicValidation):
"""
Main RF object
"""

def __init__(self, cfg: ConfigModel):
super().__init__(cfg.name)
self._cfg = cfg
def __init__(
self,
name: str,
masterclock: str | None = None,
transmitters: list[RFTransmitter] | None = None,
lattice_names: str | None = None,
description: str | None = None,
):
super().__init__(name, lattice_names, description)

self.masterclock = masterclock
self.transmitters = transmitters
self.__frequency = None
self.__voltage = None

@property
def frequency(self) -> abstract.ReadWriteFloatScalar:
if self.__frequency is None:
raise PyAMLException(f"{str(self)} has no masterclock device defined")
raise PyAMLException(f"{str(self.name)} has no masterclock device defined")
return self.__frequency

@property
def voltage(self) -> abstract.ReadWriteFloatScalar:
if self.__voltage is None:
raise PyAMLException(f"{str(self)} has no trasmitter device defined")
raise PyAMLException(f"{str(self.name)} has no transmitter device defined")
return self.__voltage

def attach(
Expand All @@ -53,7 +50,7 @@ def attach(
voltage: abstract.ReadWriteFloatScalar,
) -> Self:
# Attach frequency attribute and returns a new reference
obj = self.__class__(self._cfg)
obj = copy.copy(self)
obj.__frequency = frequency
obj.__voltage = voltage
obj._peer = peer
Expand All @@ -76,19 +73,19 @@ def get(self) -> float:
sum = 0
# Count only fundamental harmonic
for t in self.__trans:
if t._cfg.harmonic == 1.0:
if t.harmonic == 1.0:
sum += t.voltage.get()
return sum

def set(self, value: float):
# Assume that sum of transmitter (fundamental harmonic) distribution is 1
for t in self.__trans:
if t._cfg.harmonic == 1.0:
v = value * t._cfg.distribution
if t.harmonic == 1.0:
v = value * t.distribution
t.voltage.set(v)

def set_and_wait(self, value: float):
raise NotImplementedError("Not implemented yet.")

def unit(self) -> str:
return self.__trans[0]._cfg.phase.unit()
return self.__trans[0].phase_device_access.unit()
Loading
Loading