from collections import defaultdict
from collections.abc import Hashable, Iterable
from typing import Any, Protocol
from uuid import uuid4
__all__ = ['Component', 'ComponentID', 'DomainID', 'EntityID', 'Property',
'RegistryError', 'UnknownArchetypeError', 'UnknownComponentError',
'UnknownEntityError', 'UnknownSystemError', 'add_component',
'add_components', 'add_system', 'add_system_to_domain',
'add_to_archetype', 'cid_of_comp', 'cids_of_eid',
'clear_properties', 'comp_of_eid', 'comps_of_archetype',
'comps_of_eid', 'create_archetype', 'create_entity', 'eid_has',
'eid_of_comp', 'eids_by_cids', 'eids_by_property', 'has',
'has_property', 'healthcheck', 'purge_by_property',
'remove_archetype', 'remove_component', 'remove_entity',
'remove_from_archetype', 'remove_property', 'remove_system',
'remove_system_from_domain', 'reset', 'run_all_systems',
'run_domain', 'run_system', 'set_properties', 'set_property',
'update_component',]
type EntityID = Hashable
type ComponentID = Hashable
type DomainID = Hashable
type Property = Hashable
type Component = object
type _EntityComponentsBundle = tuple[EntityID, list[Component]]
type _OptionalProperties = Iterable[Property] | None
type _RunSystemResult = dict[EntityID, Any]
# The typehinting system can't work with functions using *args, **kwargs.
# This solution comes from https://docs.python.org/3/library/typing.html#annotating-callable-objects
class SystemFunction(Protocol):
def __call__(self, dt: float, eid: EntityID, *cids: ComponentID, **kwargs: dict[str, object]) -> Any: ...
eidx = {} # entity index
cidx = {} # component id index
sidx = {} # system index
didx = {} # domain index
oidx = defaultdict(set) # object index
plist = defaultdict(set) # entity property lists
archetype = {}
[docs]
class UnknownEntityError(Exception):
"""Raised if information from an unknown entity is requested."""
pass
[docs]
class UnknownComponentError(Exception):
"""Raised if an unknown component is requested."""
pass
[docs]
class UnknownSystemError(Exception):
"""Raised if an unknown system is requested."""
pass
[docs]
class UnknownArchetypeError(Exception):
"""Raised if an unknown archetype is requested."""
pass
[docs]
class RegistryError(Exception):
"""Raised if an inconsistency is found in the internal ecs registry.
:param error: The actual error message
:param eid: The entity ID of the defective registry entry
:param cid: The component ID of the defective registry entry
:param component: The component object of the defective registry entry
:param eid: The properties of the defective registry entry
Contains an error message and the registry objects that have been
identified as inconsistent.
"""
def __init__(self, error, *, eid, cid=None, component=None, other=None, properties=None):
self.error = error
self.eid = eid
self.cid = cid
self.component = component
self.other = other
self.properties = properties
[docs]
def reset() -> None:
"""Remove everything registered in the ECS.
Use this to clear everything, e.g. before a new game or when restarting a
game state.
"""
eidx.clear()
cidx.clear()
sidx.clear()
didx.clear()
oidx.clear()
plist.clear()
archetype.clear()
[docs]
def healthcheck() -> bool:
"""Perform a health check on the registry.
:returns: True if successful, RegistryError exception otherwise.
:raises RegistryError: If inconsistencies are found in the registry.
This function checks if the cross references between the entity index and
the component index are still bi-directional.
.. note:: This function is performance heavy and not designed to use in a
system. It's intended for debugging purposes only, e.g. after setting a
breakpoint.
"""
for eid in eidx:
for cid in eidx[eid]:
if cid not in cidx:
raise RegistryError('Component in eidx is missing in cidx', eid=eid, cid=cid, component=eidx[eid][cid])
if eid not in cidx[cid]:
raise RegistryError('Entity in eidx is missing in cidx component', eid=eid, cid=cid, component=eidx[eid][cid])
if eidx[eid][cid] is not cidx[cid][eid]:
raise RegistryError('Component object differs between eidx and cidx', eid=eid, cid=cid, component=eidx[eid][cid], other=cidx[cid][eid])
for cid in cidx:
for eid in cidx[cid]:
if eid not in eidx:
raise RegistryError('Entity in cidx is missing in eidx', eid=eid, cid=cid, component=cidx[cid][eid])
if cid not in eidx[eid]:
raise RegistryError('Component in cidx is missing in eidx', eid=eid, cid=cid, component=cidx[cid][eid])
for eid in plist:
if eid not in eidx:
raise RegistryError('Unknown entity in property list', eid=eid, properties=plist[eid])
... # FIXME
return True
[docs]
def create_entity(tag: EntityID = None,
components: dict[ComponentID, Component] | None = None,
properties: _OptionalProperties = None) -> EntityID:
"""Create a new entity.
:param tag: an optional ID for the entity, e.g. "player" if no tag is passed, a uuid is generated
:param components: A dict with component IDs as keys and components as values (see add_component)
:param properties: An iterable of properties
:return: The entity ID. Same as given if one is passed, otherwise a uuid4
"""
eid = tag if tag else str(uuid4())
if eid not in eidx:
eidx[eid] = {}
# Add optionally passed components
if components:
add_components(eid, components)
if properties:
set_properties(eid, properties)
return eid
[docs]
def remove_entity(eid: EntityID) -> None:
"""Remove an entity from the system.
:param eid: The entity ID
:returns: None
Removes all bindings to components and the entity_id itself from the
registry.
.. note:: Non-existent entity_ids will be silently ignored.
"""
# Ignore unknown eids, since we're removing anyways
try:
cids = eidx[eid].keys()
except KeyError:
pass
else:
remove_component(eid, *cids)
try:
del plist[eid]
except KeyError:
pass
remove_from_archetype(eid)
try:
del eidx[eid]
except KeyError:
pass
[docs]
def add_component(eid: EntityID, cid: ComponentID, comp: Component) -> ComponentID:
"""Add a component to the registry.
:param eid: The entity id to add the component to
:param cid: An identifier for the component This will be used to assign systems to entities
:param comp: The actual data object A comp can be anything that holds data, just a string, e.g. a name, a SimpleNamespace, a dataclass, ...
:return: The `cid` that was put in as an argument.
:raises UnknownEntityError: If the `eid` doesn't exist in the registry.
.. note:: Technically, there is no reason a comp object couldn't
have methods, but by concept, functionality is reserved for the System
working on the components, not the component itself.
"""
if eid not in eidx:
raise UnknownEntityError(f'Entity {eid} is not registered')
if cid not in cidx:
cidx[cid] = {}
# Make sure, when replacing a component, the the old one is removed from
# oidx for this entity.
try:
old_comp = id(eidx[eid][cid])
oidx[old_comp].discard(eid)
if not oidx[old_comp]:
del oidx[old_comp]
except KeyError:
pass
cidx[cid][eid] = comp
eidx[eid][cid] = comp
oidx[id(comp)].add(eid)
add_to_archetype(eid)
return cid
update_component = add_component
[docs]
def add_components(eid: EntityID, components: dict[ComponentID, Component]) -> list[ComponentID]:
"""A convenience wrapper around add_component.
:param components: A dict with component IDs as keys and components as values (see add_component)
:return: None
"""
return [add_component(eid, cid, comp) for cid, comp in components.items()]
[docs]
def remove_component(eid: EntityID, *cids: ComponentID) -> None:
r"""Remove one or more components from an entity.
:param eid: The entity to remove the component from
:param cids: The component ids to remove
:return: None
If the component has a shutdown\_ attribute, it is assumed to be a list of
zero parameter functions to be called in order.
.. note:: If the entity no longer exists, the error is silently ignored.
"""
for cid in cids:
remove_from_archetype(eid, cid)
# Ignore unknown cids or eids since we're removing anyways
# Also, no need to try each on their own
try:
obj = cidx[cid][eid]
obj_id = id(obj)
del cidx[cid][eid]
del eidx[eid][cid]
oidx[obj_id].discard(eid)
if not oidx[obj_id]:
del oidx[id(obj)]
except KeyError:
pass
else:
if hasattr(obj, 'shutdown_'):
obj.shutdown_()
[docs]
def add_system(fkt: SystemFunction, *cids: ComponentID) -> None:
r"""Add a system for the specifiied cids.
:param fkt: The system function
:param cids: The component ids that are required for this system
:return: None
The prototype for the function is::
fkt(delta_time, eid, *comps)
where delta_time is e.g. the miliseconds from a pygame tick. eid is the id
of the entity that matches, and \*comps are all requested components for
this specific entity.
This function is called for every entity that matches all specified
component ids.
.. note:: Registering a system automatically creates an `archetype` from the given `cids`
"""
create_archetype(*cids)
sidx[fkt] = cids
[docs]
def remove_system(fkt: SystemFunction) -> None:
"""Remove the given function from the registry.
:param fkt: The system function
:return: None
Remove the match for this function from the registry
.. note:: In contrast to `add_system`, an existing `archetype` is not automatically removed.
"""
for domain in didx:
remove_system_from_domain(domain, fkt)
# Ignore unregistered systems, since we're removing anyways
try:
del sidx[fkt]
except KeyError:
pass
[docs]
def add_system_to_domain(domain: DomainID, system: SystemFunction) -> None:
"""Create or extend a domain with the given system.
:param system: The system function
:return: None
Domains are collections of systems that can run with a single function
call run_domain.
.. important:: The system must first be registered with add_system.
"""
if domain not in didx:
didx[domain] = set()
if system not in sidx:
raise UnknownSystemError(f'system {system} is not registered')
didx[domain].add(system)
[docs]
def remove_system_from_domain(domain: DomainID, system: SystemFunction) -> None:
"""Remove a registered system from the given domain.
:param system: The system function
:return: None
.. note:: If the system is not in the domain, the error is silently ignored.
"""
if domain not in didx:
return
try:
didx[domain].remove(system)
except KeyError:
pass
[docs]
def has(eid: EntityID, has_properties: _OptionalProperties = None) -> bool:
"""Check if the given eid is valid.
:param eid: The entity to verify
:param has_properties: Optional set of required properties
:return: True if the eid is valid
.. note:: There is no reason to not use `eid in tinyecs.eidx`. This is
just for people who prefer a functional interface.
"""
if has_properties is None:
return eid in eidx
property_filter = set(has_properties)
return eid in eidx and property_filter <= plist[eid]
[docs]
def eid_has(eid: EntityID, *cids: ComponentID) -> bool:
"""Check if entity eid has all listed cids.
:param cids: All component ids that need to match
:return: True if all given cids are available for the specified eid
"""
e = eidx[eid]
for cid in cids:
if cid not in e:
return False
return True
[docs]
def eids_by_cids(*cids: ComponentID, has_properties: _OptionalProperties = None) -> list[tuple[EntityID, list[Component]]]:
"""Get eids that match all specified cids.
:param cids: All component ids that need to match
:param as_properties: Optional set/tuple/list of required properties
:return: A list of tuples of entity IDs and components
"""
res = []
at = tuple(cids)
if at in archetype:
return comps_of_archetype(*cids, has_properties=has_properties)
if has_properties:
property_filter = set(has_properties)
for e, have_comps in eidx.items():
if has_properties and not property_filter <= plist[e]:
continue
comps = []
for c in cids:
if c in have_comps:
comps.append(have_comps[c])
else:
break
else:
res.append((e, comps))
return res
[docs]
def cids_of_eid(eid: EntityID) -> list[ComponentID]:
"""Return the list of component IDs of the specified entity.
:param eid: The entity ID
:return: List of component IDs of this entity
:raises UnknownEntityError: If the entity is not registered (anymore).
"""
if eid not in eidx:
raise UnknownEntityError(f'Entity {eid} is not registered')
return list(eidx[eid].keys())
[docs]
def comps_of_eid(eid: EntityID, *cids: ComponentID) -> list[Component]:
"""Get components from the eid for the specified cids.
:param eid: the entity id from which to get the components
:param cids: the list of components to fetch
:return: The (filtered) components of eid
:raises UnknownEntityError: If the entity is not registered (anymore).
:raises UnknownComponentError: If the passed component couldn't be found.
While cids_of_eid gets component IDs, this function now gets the actual
components containing the data.
.. note:: If cids is empty or None, returns all components of eid.
"""
if eid not in eidx:
raise UnknownEntityError(f'Entity {eid} is not registered')
if not cids:
return list(eidx[eid].values())
try:
return [eidx[eid][cid] for cid in cids]
except KeyError as e:
raise UnknownComponentError(f'Component {e} not registered with entity {eid}') from e
[docs]
def comp_of_eid(eid: EntityID, cid: ComponentID) -> Component:
"""Get a single component from an entity.
:param eid: The entity id from which to get the component
:param cid: The component id to filter for
:return: The requested component of the given entity ID
"""
return comps_of_eid(eid, cid)[0]
[docs]
def eid_of_comp(comp: Component) -> set(EntityID):
"""Find the entity id for object comp.
:param comp: The component to find the entity of
:return: The entity_id the given component belongs to
"""
return oidx[id(comp)]
[docs]
def cid_of_comp(eid: EntityID, comp: Component) -> ComponentID:
"""Get the cid of a component.
:param eid: The entity ID
:param comp: The component to identify
:return: The cid that the given component was added under.
:raises UnknownEntityError: If the entity is not registered (anymore).
:raises UnknownComponentError: If the passed component couldn't be found.
A system only receives an actual component, but cannot know under which
name this was targetted. This function searches the cid of the given
component.
.. note:: This is a relatively expensive operation, but since it will
mostly be used to clean up old connections between entities, it should
be a one-shot and worth the price.
"""
if not has(eid):
raise UnknownEntityError(f'Entity {eid} is not registered')
cids = cids_of_eid(eid)
for cid in cids:
if eidx[eid][cid] is comp:
return cid
raise UnknownComponentError(f'Component {comp} not found in entity {eid}')
[docs]
def run_system(dt: float,
fkt: SystemFunction,
*cids: ComponentID,
has_properties: _OptionalProperties = None,
**kwargs: dict[str, Any]) -> _RunSystemResult:
"""Run the system for the matching cids.
:param dt: delta time since the last frame (miliseconds)
:param fkt: the actual system function
:param cids: the components to run on
:param has_properties: set of required properties
:return: A dictionary with entity IDs as key and the function result as value
The fkt gets the list of all entities that contain the listed
components. The list can further be narrowed down my filtering for given
properties. Then it runs the function for every entity and the requested
components, passing dt as heartbeat.
This function is a direct call. Alternatively, you can use add_system
combined with run_all_systems or run_domain below.
"""
at = tuple(cids)
if at not in archetype:
create_archetype(*cids)
adict = archetype[at]
# need to get call_list upfront, since kill_system could modify the dict
# Also: not running in the if clause is vastly faster than checking an
# empty set.
if has_properties:
property_filter = set(has_properties)
call_list = [(eid, *parms) for eid, parms in adict.items()
if property_filter <= plist[eid]]
else:
call_list = [(eid, *parms) for eid, parms in adict.items()]
return {eid: fkt(dt, eid, *parms, **kwargs) for eid, *parms in call_list}
[docs]
def run_all_systems(dt: float) -> dict[SystemFunction, _RunSystemResult]:
"""Run all registered systems.
:param dt: Delta time
:return: A dict of function as key, and the result of run_system as value
This calls above run_system for all registered systems with their
appropriate components.
"""
return {fkt: run_system(dt, fkt, *comps)
for fkt, comps in sidx.items()}
[docs]
def run_domain(dt: float, domain: DomainID) -> dict[SystemFunction, _RunSystemResult]:
"""Run all systems within domain.
:param dt: Delta time
:return: A dict of function as key, and the result of run_system as value
This is the same as run_all_systems, but limited to a specific domain.
"""
if domain not in didx:
return {}
return {fkt: run_system(dt, fkt, *sidx[fkt])
for fkt in didx[domain]}
[docs]
def create_archetype(*cids: ComponentID) -> None:
"""Create an archetype from the provided cids.
:param cids The list of cids that define the archetype.
:return: None
An archetype is a fixed combination of components. Each time a component
is added or removed from an entity, that entity and its components are
added/removed from the archetype.
This removes the need to search through all entities for the matching cids
in favour of returning the finished list directly.
Since every system is usually run every frame, there are a *lot* of
searches, which is very expensive.
The cost to insert/remove an entity into/from the archetype is a rather
small operation that is only done when the entity is changed.
.. note:: Archetypes are created automatically as soon as a system runs
for the first time. Manually creating an archetype is only useful if
a function wishes to work on a set of entities outside the
`run_system` framework.
.. important: The order of the given cids is important, since the system relies on it.
"""
at = tuple(cids)
if at in archetype:
return
archetype[at] = dict(eids_by_cids(*cids)) if at else {eid: [] for eid in eidx}
[docs]
def remove_archetype(cids: Iterable[ComponentID]) -> None:
"""Remove an archetype from the system. (See `add_archetype`).
:param cids: The list of component IDs that construct the archetype.
:return: None
"""
at = tuple(cids)
del archetype[at]
[docs]
def add_to_archetype(eid: EntityID) -> None:
"""Make sure, eid is registered with all appropriate archetypes.
:param eids: The entity ID to add to the archetype
There should be no need to call this function manually. It's called
internally, when a new component is added to an entity which makes this
entity belong into an archetype it wasn't a member of before.
"""
have_comps = set(cids_of_eid(eid))
for at in archetype:
if not at:
archetype[at][eid] = []
else:
s = set(at)
if s <= have_comps:
archetype[at][eid] = comps_of_eid(eid, *at)
[docs]
def remove_from_archetype(eid: EntityID, cid: ComponentID | None = None) -> None:
"""Make sure, eid is only registered with appropriate archetypes.
:param eids: The entity ID to remove from the archetype
:param cid: An optional component ID.
There should be no need to call this function manually. It's called
internally, when a component is removed from an entity, so that the entity
no longer belongs to that archetype.
"""
for at, adict in archetype.items():
if eid in adict and (cid is None or cid in at):
del adict[eid]
[docs]
def comps_of_archetype(*cids: ComponentID, has_properties: _OptionalProperties = None) -> list[_EntityComponentsBundle]:
"""Return the given archetype.
:param cids The cids that define the archetype.
:param has_properties Optional set of required properties
:return: A list of tuples of (eid, components)
:raises UnknownArchetypeError: If the given archetype doesn't exist.
Primarily used internally by `run_system`.
Returns a list of tuples consisting of eid and components.
"""
at = tuple(cids)
if at not in archetype:
raise UnknownArchetypeError
if has_properties:
property_filter = set(has_properties)
return [(e, comps)
for e, comps in archetype[at].items()
if property_filter <= plist[e]]
else:
return [(e, comps) for e, comps in archetype[at].items()]
[docs]
def set_property(eid: EntityID, property: Property) -> None:
"""Add a property to the given entity.
:param eid: The entity id to add the component to
:param property: A flag or tag that can be filtered, e.g. 'is_drawable'
:return: None
:raises UnknownEntityError: If the entity is not registered (anymore).
"""
if eid not in eidx:
raise UnknownEntityError(f'Entity {eid} is not registered')
plist[eid].add(property)
[docs]
def set_properties(eid: EntityID, properties: Iterable[Property]) -> None:
"""Add a list of properties to the given entity.
:param eid: The entity ID to add the properties to.
:param property: The list of properties to add.
This is just a convenience wrapper around `set_property`.
"""
for prop in properties:
set_property(eid, prop)
[docs]
def has_property(eid: EntityID, prop: Property) -> bool:
"""Checks if the given entity has that property.
:param eid: The entity id to add the component to
:param property: The property to check for
:return: True if the property is set for the given entity
:raises UnknownEntityError: If the entity is not registered (anymore).
"""
if eid not in eidx:
raise UnknownEntityError(f'Entity {eid} is not registered')
return prop in plist[eid]
[docs]
def remove_property(eid: EntityID, prop: Property) -> None:
"""Removes a property from the given entity.
:param eid: The entity id to add the component to
:param property: The property to remove
:return: None
:raises UnknownEntityError: If the entity is not registered (anymore).
"""
if eid not in eidx:
raise UnknownEntityError(f'Entity {eid} is not registered')
plist[eid].remove(prop)
[docs]
def clear_properties(eid: EntityID) -> None:
"""Removes all properties from the given entity.
:param eid: The entity id to add the component to
:return: None
:raises UnknownEntityError: If the entity is not registered (anymore).
"""
if eid not in eidx:
raise UnknownEntityError(f'Entity {eid} is not registered')
plist[eid] = set()
[docs]
def eids_by_property(*properties: Property) -> list[EntityID]:
"""Get a list of entities that match all given properties.
:param properties: All properties that need to match
:return: Entities matching the given properties
"""
property_filter = set(properties)
return [eid for eid, props in plist.items() if property_filter <= props]
[docs]
def purge_by_property(*properties: Property) -> None:
"""Purge entities that match all given properties.
:param properties: All properties that need to match
This is useful to clean up all entities belonging to a sub state without
impacting the remaining registry.
"""
for eid in eids_by_property(*properties):
remove_entity(eid)