Source code for tinyecs

from collections import defaultdict
from collections.abc import Hashable, Iterable
from typing import Any, Protocol
from uuid import uuid4

__all__ = ['Component', 'ComponentID', 'DomainID', 'EntityID', 'Property',
           'RegistryError', 'UnknownArchetypeError', 'UnknownComponentError',
           'UnknownEntityError', 'UnknownSystemError', 'add_component',
           'add_components', 'add_system', 'add_system_to_domain',
           'add_to_archetype', 'cid_of_comp', 'cids_of_eid',
           'clear_properties', 'comp_of_eid', 'comps_of_archetype',
           'comps_of_eid', 'create_archetype', 'create_entity', 'eid_has',
           'eid_of_comp', 'eids_by_cids', 'eids_by_property', 'has',
           'has_property', 'healthcheck', 'purge_by_property',
           'remove_archetype', 'remove_component', 'remove_entity',
           'remove_from_archetype', 'remove_property', 'remove_system',
           'remove_system_from_domain', 'reset', 'run_all_systems',
           'run_domain', 'run_system', 'set_properties', 'set_property',
           'update_component',]

type EntityID = Hashable
type ComponentID = Hashable
type DomainID = Hashable
type Property = Hashable
type Component = object

type _EntityComponentsBundle = tuple[EntityID, list[Component]]
type _OptionalProperties = Iterable[Property] | None
type _RunSystemResult = dict[EntityID, Any]


# The typehinting system can't work with functions using *args, **kwargs.
# This solution comes from https://docs.python.org/3/library/typing.html#annotating-callable-objects
class SystemFunction(Protocol):
    def __call__(self, dt: float, eid: EntityID, *cids: ComponentID, **kwargs: dict[str, object]) -> Any: ...


eidx = {}  # entity index
cidx = {}  # component id index
sidx = {}  # system index
didx = {}  # domain index
oidx = defaultdict(set)  # object index
plist = defaultdict(set)  # entity property lists
archetype = {}


[docs] class UnknownEntityError(Exception): """Raised if information from an unknown entity is requested.""" pass
[docs] class UnknownComponentError(Exception): """Raised if an unknown component is requested.""" pass
[docs] class UnknownSystemError(Exception): """Raised if an unknown system is requested.""" pass
[docs] class UnknownArchetypeError(Exception): """Raised if an unknown archetype is requested.""" pass
[docs] class RegistryError(Exception): """Raised if an inconsistency is found in the internal ecs registry.""" def __init__(self, error, *, eid, cid=None, component=None, other=None, properties=None): self.error = error self.eid = eid self.cid = cid self.component = component self.other = other self.properties = properties
[docs] def reset() -> None: """Remove everything registered in the ECS. Use this to clear everything, e.g. before a new game """ eidx.clear() cidx.clear() sidx.clear() didx.clear() oidx.clear() plist.clear() archetype.clear()
[docs] def healthcheck() -> bool: """Perform a health check on the registry. This function checks if the cross references between the entity index and the component index are still bi-directional. NOTE: This function is performance heavy and not designed to use in a system. It's intended for debugging purposes only, e.g. after setting a breakpoint. Returns ------- bool True if successful, RegistryError exception otherwise. Raises ------ RegistryError(error, eid, cid, component, other=None) error verbose error message eid id of entity cid id of component component component object other component in other index or None """ for eid in eidx: for cid in eidx[eid]: if cid not in cidx: raise RegistryError('Component in eidx is missing in cidx', eid=eid, cid=cid, component=eidx[eid][cid]) if eid not in cidx[cid]: raise RegistryError('Entity in eidx is missing in cidx component', eid=eid, cid=cid, component=eidx[eid][cid]) if eidx[eid][cid] is not cidx[cid][eid]: raise RegistryError('Component object differs between eidx and cidx', eid=eid, cid=cid, component=eidx[eid][cid], other=cidx[cid][eid]) for cid in cidx: for eid in cidx[cid]: if eid not in eidx: raise RegistryError('Entity in cidx is missing in eidx', eid=eid, cid=cid, component=cidx[cid][eid]) if cid not in eidx[eid]: raise RegistryError('Component in cidx is missing in eidx', eid=eid, cid=cid, component=cidx[cid][eid]) for eid in plist: if eid not in eidx: raise RegistryError('Unknown entity in property list', eid=eid, properties=plist[eid]) ... # FIXME return True
[docs] def create_entity(tag: EntityID = None, components: dict[ComponentID, Component] | None = None, properties: _OptionalProperties = None) -> EntityID: """Create a new entity Parameters ---------- tag: an optional ID for the entity, e.g. "player" if no tag is passed, a uuid is generated components: A dict with component IDs as keys and components as values (see add_component) properties: An iterable of properties Returns ------- Hashable The entity ID. Same as given if one is passed, otherwise a uuid4 """ eid = tag if tag else str(uuid4()) eidx[eid] = {} # Add optionally passed components if components: add_components(eid, components) if properties: set_properties(eid, properties) return eid
[docs] def remove_entity(eid: EntityID) -> None: """Remove an entity from the system. Removes all bindings to components and the entity_id itself from the registry. non-existent entity_ids will be silently ignored Parameters ---------- eid: hashable The entity ID Returns ------- None """ # Ignore unknown eids, since we're removing anyways try: cids = eidx[eid].keys() except KeyError: pass else: remove_component(eid, *cids) try: del plist[eid] except KeyError: pass remove_from_archetype(eid) try: del eidx[eid] except KeyError: pass
[docs] def add_component(eid: EntityID, cid: ComponentID, comp: Component) -> ComponentID: """Add a component to the registry. Parameters ---------- eid: hashable The entity id to add the component to cid: hashable An identifier for the component This will be used to assign systems to entities comp: * The actual data object A comp can be anything that holds data, just a string, e.g. a name, a SimpleNamespace, a dataclass, ... Note: Technically, there is no reason a comp object couldn't have methods, but by concept, functionality is reserved for the System working on the components, not the component itself. Returns ------- cid: hashable The `cid` that was put in as an argument. Raises ------ UnknownEntityError If the `eid` doesn't exist in the registry. """ if eid not in eidx: raise UnknownEntityError(f'Entity {eid} is not registered') if cid not in cidx: cidx[cid] = {} # Make sure, when replacing a component, the the old one is removed from # oidx for this entity. try: old_comp = id(eidx[eid][cid]) oidx[old_comp].discard(eid) if not oidx[old_comp]: del oidx[old_comp] except KeyError: pass cidx[cid][eid] = comp eidx[eid][cid] = comp oidx[id(comp)].add(eid) add_to_archetype(eid) return cid
update_component = add_component
[docs] def add_components(eid: EntityID, components: dict[ComponentID, Component]) -> list[ComponentID]: """A convenience wrapper around add_component Parameters ---------- components: A dict with component IDs as keys and components as values (see add_component) Returns ------- None """ return [add_component(eid, cid, comp) for cid, comp in components.items()]
[docs] def remove_component(eid: EntityID, *cids: ComponentID) -> None: """Remove one or more components from an entity. Parameters ---------- eid: The entity to remove the component from cids: The component ids to remove If the component has a shutdown_ attribute, it is assumed to be a list of zero parameter functions to be called in order. Note: If the entity no longer exists, the error is silently ignored. Returns ------- None """ for cid in cids: remove_from_archetype(eid, cid) # Ignore unknown cids or eids since we're removing anyways # Also, no need to try each on their own try: obj = cidx[cid][eid] obj_id = id(obj) del cidx[cid][eid] del eidx[eid][cid] oidx[obj_id].discard(eid) if not oidx[obj_id]: del oidx[id(obj)] except KeyError: pass else: if hasattr(obj, 'shutdown_'): obj.shutdown_()
[docs] def add_system(fkt: SystemFunction, *cids: ComponentID) -> None: """Add a system for the specifiied cids. Parameters ---------- fkt: The system function The prototype for the function is fkt(delta_time, eid, *comps) where delta_time is e.g. the miliseconds from a pygame tick. eid is the id of the entity that matches, and *comps are all requested components for this specific entity. This function is called for every entity that matches all specified component ids. *cids: The component ids that are required for this system Returns ------- None Note ---- Registering a system automatically creates an `archetype` from the given `cids` """ create_archetype(*cids) sidx[fkt] = cids
[docs] def remove_system(fkt: SystemFunction) -> None: """Remove the given function from the registry. Parameters ---------- fkt: The system function Remove the match for this function from the registry Returns ------- None Note ---- In contrast to `add_system`, an existing `archetype` is not automatically removed. """ for domain in didx: remove_system_from_domain(domain, fkt) # Ignore unregistered systems, since we're removing anyways try: del sidx[fkt] except KeyError: pass
[docs] def add_system_to_domain(domain: DomainID, system: SystemFunction) -> None: """Create or extend a domain with the given system. Domains are collections of systems that can run with a single function call run_domain. Note that the system must first be registered with add_system add_system_to_domain('render-phase', draw_system) Parameters ---------- system: Callable The system function Returns ------- None """ if domain not in didx: didx[domain] = set() if system not in sidx: raise UnknownSystemError(f'system {system} is not registered') didx[domain].add(system)
[docs] def remove_system_from_domain(domain: DomainID, system: SystemFunction) -> None: """Remove a registered system from the given domain. Parameters ---------- system: Callable The system function Returns ------- None Note ---- If the system is not in the domain, the error is silently ignored. """ if domain not in didx: return try: didx[domain].remove(system) except KeyError: pass
[docs] def has(eid: EntityID, has_properties: _OptionalProperties = None) -> bool: """Check if the given eid is valid. There is no reason to not use `eid in tinyecs.eidx`. This is just for people who prefer a functional interface. Parameters ---------- eid The entity to verify has_properties Optional set of required properties Returns ------- bool True if the eid is valid """ if has_properties is None: return eid in eidx property_filter = set(has_properties) return eid in eidx and property_filter <= plist[eid]
[docs] def eid_has(eid: EntityID, *cids: ComponentID) -> bool: """check if entity eid has all listed cids. Parameters ---------- *cids: All component ids that need to match Returns ------- bool True if all given cids are available for the specified eid """ e = eidx[eid] for cid in cids: if cid not in e: return False return True
[docs] def eids_by_cids(*cids: ComponentID, has_properties: _OptionalProperties = None) -> list[tuple[EntityID, list[Component]]]: """get eids that match all specified cids Parameters ---------- *cids: All component ids that need to match has_properties: set|tuple|list Optional set/tuple/list of required properties Returns ------- list A list of tuples of entity IDs and components """ res = [] at = tuple(cids) if at in archetype: return comps_of_archetype(*cids, has_properties=has_properties) if has_properties: property_filter = set(has_properties) for e, have_comps in eidx.items(): if has_properties and not property_filter <= plist[e]: continue comps = [] for c in cids: if c in have_comps: comps.append(have_comps[c]) else: break else: res.append((e, comps)) return res
[docs] def cids_of_eid(eid: EntityID) -> list[ComponentID]: """Return the list of component IDs of the specified entity Parameters ---------- eid: hashable The entity ID Returns ------- list List of component IDs of this entity Raises ------ UnknownEntityError If the entity is not registered (anymore). """ if eid not in eidx: raise UnknownEntityError(f'Entity {eid} is not registered') return list(eidx[eid].keys())
[docs] def comps_of_eid(eid: EntityID, *cids: ComponentID) -> list[Component]: """Get components from the eid for the specified cids. While cids_of_eid gets component IDs, this function now gets the actual components containing the data. if cids is empty or None, returns all components of eid. Parameters ---------- eid: the entity id from which to get the components cids: the list of components to fetch Returns ------- list The (filtered) components of eid Raises ------ UnknownEntityError If the entity is not registered (anymore). UnknownComponentError If the passed component couldn't be found. """ if eid not in eidx: raise UnknownEntityError(f'Entity {eid} is not registered') if not cids: return list(eidx[eid].values()) try: return [eidx[eid][cid] for cid in cids] except KeyError as e: raise UnknownComponentError(f'Component {e} not registered with entity {eid}') from e
[docs] def comp_of_eid(eid: EntityID, cid: ComponentID) -> Component: """Get a single component from an entity. Parameters ---------- eid: the entity id from which to get the component cid: the component id to filter for Returns ------- component: object The requested component of the given entity ID """ return comps_of_eid(eid, cid)[0]
[docs] def eid_of_comp(comp: Component) -> set(EntityID): """Find the entity id for object comp. Parameters ---------- comp: the component to find the entity of Returns ------- entity_id The entity_id the given component belongs to """ return oidx[id(comp)]
[docs] def cid_of_comp(eid: EntityID, comp: Component) -> ComponentID: """Get the cid of a component. A system only receives an actual component, but cannot know under which name this was targetted. This function searches the cid of the given component. Note: This is a relatively expensive operation, but since it will mostly be used to clean up old connections between entities, it should be a one-shot and worth the price. Parameters ---------- eid: hashable The entity ID comp: object The component to identify Returns ------- hashable The cid that the given component was added under. Raises ------ UnknownEntityError If the entity is not registered (anymore). UnknownComponentError If the passed component couldn't be found. """ if not has(eid): raise UnknownEntityError(f'Entity {eid} is not registered') cids = cids_of_eid(eid) for cid in cids: if eidx[eid][cid] is comp: return cid raise UnknownComponentError(f'Component {comp} not found in entity {eid}')
[docs] def run_system(dt: float, fkt: SystemFunction, *cids: ComponentID, has_properties: _OptionalProperties = None, **kwargs: dict[str, Any]) -> _RunSystemResult: """Run the system for the matching cids. Parameters ---------- dt: delta time since the last frame (miliseconds) fkt: the actual system function *cids: the components to run on has_properties: set of required properties The fkt gets the list of all entities that contain the listed components. The list can further be narrowed down my filtering for given properties. Then it runs the function for every entity and the requested components, passing dt as heartbeat. This function is a direct call. Alternatively, you can use add_system combined with run_all_systems or run_domain below. Returns ------- dict A dictionary with entity IDs as key and the function result as value """ at = tuple(cids) if at not in archetype: create_archetype(*cids) adict = archetype[at] # need to get call_list upfront, since kill_system could modify the dict # Also: not running in the if clause is vastly faster than checking an # empty set. if has_properties: property_filter = set(has_properties) call_list = [(eid, *parms) for eid, parms in adict.items() if property_filter <= plist[eid]] else: call_list = [(eid, *parms) for eid, parms in adict.items()] return {eid: fkt(dt, eid, *parms, **kwargs) for eid, *parms in call_list}
[docs] def run_all_systems(dt: float) -> dict[SystemFunction, _RunSystemResult]: """Run all registered systems This calls above run_system for all registered systems with their appropriate components. Parameters ---------- dt: delta time Returns ------- dict A dict of function as key, and the result of run_system as value """ return {fkt: run_system(dt, fkt, *comps) for fkt, comps in sidx.items()}
[docs] def run_domain(dt: float, domain: DomainID) -> dict[SystemFunction, _RunSystemResult]: """Run all systems within domain. This is the same as run_all_systems, but limited to a specific domain. Parameters ---------- dt: delta time Returns ------- dict A dict of function as key, and the result of run_system as value """ if domain not in didx: return {} return {fkt: run_system(dt, fkt, *sidx[fkt]) for fkt in didx[domain]}
[docs] def create_archetype(*cids: ComponentID) -> None: """Create an archetype from the provided cids. An archetype is a fixed combination of components. Each time a component is added or removed from an entity, that entity and its components are added/removed from the archetype. This removes the need to search through all entities for the matching cids in favour of returning the finished list directly. Since every system is usually run every frame, there are a *lot* of searches, which is very expensive. The cost to insert/remove an entity into/from the archetype is a rather small operation that is only done when the entity is changed. Note ---- Archetypes are created automatically as soon as a system runs for the first time. Manually creating an archetype is only useful if a function wishes to work on a set of entities outside the `run_system` framework. Parameters ---------- cids The list of cids that define the archetype. Note: Order is important, since the system relies on it. Returns ------- None """ at = tuple(cids) if at in archetype: return archetype[at] = dict(eids_by_cids(*cids))
[docs] def remove_archetype(cids: Iterable[ComponentID]) -> None: """Remove an archetype from the system. (See `add_archetype`). Parameters ---------- cids The list of component IDs that construct the archetype. Returns ------- None """ at = tuple(cids) del archetype[at]
[docs] def add_to_archetype(eid: EntityID) -> None: """Make sure, eid is registered with all appropriate archetypes. Parameters ---------- eids The entity ID to add to the archetype There should be no need to call this function manually. It's called internally, when a new component is added to an entity which makes this entity belong into an archetype it wasn't a member of before. """ have_comps = set(cids_of_eid(eid)) for at in archetype: s = set(at) if s <= have_comps: archetype[at][eid] = comps_of_eid(eid, *at)
[docs] def remove_from_archetype(eid: EntityID, cid: ComponentID | None = None) -> None: """Make sure, eid is only registered with appropriate archetypes. Parameters ---------- eids The entity ID to remove from the archetype cid An optional component ID. There should be no need to call this function manually. It's called internally, when a component is removed from an entity, so that the entity no longer belongs to that archetype. """ for at, adict in archetype.items(): if eid in adict and (cid is None or cid in at): del adict[eid]
[docs] def comps_of_archetype(*cids: ComponentID, has_properties: _OptionalProperties = None) -> list[_EntityComponentsBundle]: """Return the given archetype. Primarily used by `run_system`. Returns a list of tuples consisting of eid and components. Parameters ---------- cids The cids that define the archetype. has_properties Optional set of required properties Returns ------- List[tuple[Hashable, list[object]]] A list of tuples of (eid, components) Raises ------ UnknownArchetypeError If the given archetype doesn't exist. """ at = tuple(cids) if at not in archetype: raise UnknownArchetypeError if has_properties: property_filter = set(has_properties) return [(e, comps) for e, comps in archetype[at].items() if property_filter <= plist[e]] else: return [(e, comps) for e, comps in archetype[at].items()]
[docs] def set_property(eid: EntityID, property: Property) -> None: """Add a property to the given entity. Parameters ---------- eid: hashable The entity id to add the component to property: hashable A flag or tag that can be filtered, e.g. 'is_drawable' Returns ------- None Raises ------ UnknownEntityError If the entity is not registered (anymore). """ if eid not in eidx: raise UnknownEntityError(f'Entity {eid} is not registered') plist[eid].add(property)
[docs] def set_properties(eid: EntityID, properties: Iterable[Property]) -> None: """Add a list of properties to the given entity. Parameters ---------- eid The entity ID to add the properties to. property: The list of properties to add. This is just a convenience wrapper around `set_property`. """ for prop in properties: set_property(eid, prop)
[docs] def has_property(eid: EntityID, prop: Property) -> bool: """Checks if the given entity has that property. Parameters ---------- eid: hashable The entity id to add the component to property: hashable The property to check for Returns ------- bool Raises ------ UnknownEntityError If the entity is not registered (anymore). """ if eid not in eidx: raise UnknownEntityError(f'Entity {eid} is not registered') return prop in plist[eid]
[docs] def remove_property(eid: EntityID, prop: Property) -> None: """Removes a property from the given entity. Parameters ---------- eid: hashable The entity id to add the component to property: hashable The property to remove Returns ------- None Raises ------ UnknownEntityError If the entity is not registered (anymore). """ if eid not in eidx: raise UnknownEntityError(f'Entity {eid} is not registered') plist[eid].remove(prop)
[docs] def clear_properties(eid: EntityID) -> None: """Removes all properties from the given entity. Parameters ---------- eid: hashable The entity id to add the component to Returns ------- None Raises ------ UnknownEntityError If the entity is not registered (anymore). """ if eid not in eidx: raise UnknownEntityError(f'Entity {eid} is not registered') plist[eid] = set()
[docs] def eids_by_property(*properties: Property) -> list[EntityID]: """Get a list of entities that match all given properties. Parameters ---------- *properties: All properties that need to match Returns ------- list[EntityID] Entities matching the given properties """ property_filter = set(properties) return [eid for eid, props in plist.items() if property_filter <= props]
[docs] def purge_by_property(*properties: Property) -> None: """Purge entities that match all given properties. This is useful to clean up all entities belonging to a sub state without impacting the remaining registry. Parameters ---------- *properties: All properties that need to match """ for eid in eids_by_property(*properties): remove_entity(eid)