# The contents of this file are subject to the Mozilla Public License
# (MPL) Version 1.1 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License
# at http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS"
# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
# the License for the specific language governing rights and
# limitations under the License.
#
# The Original Code is Pytyp (http://www.acooke.org/pytyp)
# The Initial Developer of the Original Code is Andrew Cooke.
# Portions created by the Initial Developer are Copyright (C) 2011
# Andrew Cooke. All Rights Reserved.
#
# Alternatively, the contents of this file may be used under the terms
# of the LGPL license (the GNU Lesser General Public License,
# http://www.gnu.org/licenses/lgpl.html), in which case the provisions
# of the LGPL License are applicable instead of those above.
#
# If you wish to allow use of your version of this file only under the
# terms of the LGPL License and not to allow others to use your version
# of this file under the MPL, indicate your decision by deleting the
# provisions above and replace them with the notice and other provisions
# required by the LGPL License. If you do not delete the provisions
# above, a recipient may use your version of this file under either the
# MPL or the LGPL License.
from abc import ABCMeta, abstractmethod
from collections import Sequence, Mapping, ByteString, Container
from itertools import count
from numbers import Number
from reprlib import recursive_repr
from weakref import WeakSet, WeakKeyDictionary
from pytyp.util import items, make_recursive_block
# TODO abc_name in wrong place? see cls
# TODO weak dict for types?
block_recursive_type = make_recursive_block(lambda args: (id(args[0]), id(args[1])),
lambda _: RecursiveType.throw())
class TSMeta(ABCMeta):
'''
The metaclass for type specifications. This extends ``ABCMeta`` to include
the instance hook and adds a few utilities.
>>> normalize(Rec(a=int))
Rec(a=int)
>>> normalize({'a': int})
Rec(a=int)
>>> normalize((int, str))
Rec(int,str)
>>> normalize(Rec((int,), {'a':str}))
Rec(Rec(int),Rec(a=str))
>>> normalize([int])
Seq(int)
>>> normalize([])
Seq(Cls(object))
>>> Seq([int])
Seq(Seq(int))
>>> normalize(int)
int
>>> class Foo: pass
>>> normalize(Foo)
Cls(Foo)
>>> normalize(Alt(int, str))
Alt(int,str)
>>> normalize(Alt(int, Foo))
Alt(int,Cls(Foo))
>>> normalize(Opt([int]))
Opt(Seq(int))
>>> normalize([int, str])
Rec(int,str)
'''
def __instancecheck__(cls, instance):
try:
# if false may still be subclass
if cls.__instancehook__(instance):
return True
except AttributeError:
pass
return super().__instancecheck__(instance)
@staticmethod
@make_recursive_block()
def _normalize(spec):
'''
This rewrites the "shorthand" form (without ``Cls()``, using ``[]``
instead of ``Seq()``, etc).
'''
if isinstance(spec, list):
if not spec:
return Seq()
if len(spec) == 1:
return Seq(normalize(spec[0]))
else:
return Rec(*map(normalize, spec))
elif isinstance(spec, dict):
return Rec(_dict=dict((name, normalize(spec[name]))
for name in spec))
elif isinstance(spec, tuple):
return Rec(*tuple(normalize(s) for s in spec))
elif isinstance(spec, type):
if issubclass(spec, Delayed):
spec._spec = normalize(spec._spec)
return spec
elif NoNormalize in spec.__bases__:
return spec
else:
return Cls(spec)
else:
return normalize(type(spec))
@recursive_repr()
def __repr__(cls):
try:
return cls._reprhook()
except AttributeError:
return super().__repr__()
normalize = TSMeta._normalize
'''
Type specifications are built using constructors like ``Seq()`` and ``Rec()``,
but it is also possible to use a "shorthand" form, in which ``()`` and ``{}``
are used for for records, and ``[]`` for sequences.
This routine rewrites the shorthand into the standard format.
>>> normalize({'a': int})
Rec(a=int)
>>> normalize((int, str))
Rec(int,str)
>>> normalize([int])
Seq(int)
>>> normalize([])
Seq(Cls(object))
>>> normalize(Opt([int]))
Opt(Seq(int))
>>> normalize([int, str])
Rec(int,str)
'''
class NoStructural():
'''
This identifies subclasses of type specifications, which should not need
to have structural verification. It's equivalent to TypeSpec, but avoids
the ABC registration logic.
'''
pass
class NoNormalize():
'''
Immediate subclasses are considered to be type specifications and are not
normalized.
'''
pass
class ReprBase(NoStructural, metaclass=TSMeta):
@classmethod
def _reprhook(cls):
try:
return '{}({})'.format(cls._abc_name, cls._fmt_args())
except AttributeError:
return cls.__name__
class TypeSpec(ReprBase):
'''
Subclasses must provide their own cls._abc_instance_registry as a WeakSet.
'''
@classmethod
def _structuralcheck(cls, instance, check=isinstance):
def verify(_, vsn):
for (v, s, _) in vsn:
if not check(v, s): raise TypeError
return True
try:
return cls._backtrack(instance, verify)
except TypeError:
return False
@classmethod
def register_instance(cls, instance):
if isinstance(instance, cls):
return # Already an instance
cls._abc_instance_registry.add(instance)
@classmethod
@block_recursive_type
def __instancehook__(cls, instance):
try:
if instance in cls._abc_instance_registry:
return True
except TypeError: # unhashable
pass
# only do structural check for classes that are not already subclasses
# or we get confusing results with empty containers (and slower code)
if not isinstance(instance, NoStructural):
return cls._structuralcheck(instance)
@classmethod
def _for_each(cls, value, callback):
return callback(cls, cls._vsn(value))
class Atomic(metaclass=ABCMeta):
'''
These are formatted without "Cls(...)".
'''
Atomic.register(Number)
Atomic.register(ByteString)
Atomic.register(str)
Atomic.register(bool)
class RecursiveType(TypeError):
@staticmethod
def throw(_): raise RecursiveType
class DelayedTypeError(TypeError): pass
[docs]class NoBacktrack(Exception, metaclass=ABCMeta):
'''
If this exception (or a subclass, or a registered class) occurs within
``_backtrack()`` then it will be allowed to "escape" to the surrounding
code (other exceptions will be caught and used to trigger backtracking
over sum types).
'''
pass
def expand(value, spec, callback):
return normalize(spec)._for_each(value, callback)
def type_error(value, spec):
raise TypeError('Type {1} inconsistent with {0!r}.'.format(value, spec))
def _sort_key(key):
key = Rec.OptKey.unpack(key)
if isinstance(key, int):
return key - 10000 # no maxint in python 3
else:
return abs(hash(key))
def _hashable_types(args, kargs):
'''
Given a list of args and kargs, both of which are type specifications,
generate a tuple of that information that is sorted so that it can be used as
a reliable key for the same information (ie normalized and ordered).
Also, order the args (indexed specs) before the kargs so that generating
the repr can handle them correctly.
'''
types = dict((name, normalize(karg)) for (name, karg) in kargs.items())
for (index, arg) in zip(count(), args):
types[index] = normalize(arg)
return tuple((key, types[key]) for key in sorted(types.keys(), key=_sort_key))
def _unhashable_types(types):
return (tuple(spec for (name, spec) in types if isinstance(name, int)),
dict((name, spec) for (name, spec) in types if not isinstance(name, int)))
def _polymorphic_subclass(bases, args, kargs):
abc = bases[0]
args = tuple(normalize(arg) for arg in args)
kargs = dict((name, normalize(karg))
for (name, karg) in kargs.items())
types = _hashable_types(args, kargs)
if types not in abc._abc_polymorphic_cache:
# replaced a standard class definition with this to help with debugging
# as it was confusing when everything had the same name
subclass = type(abc)(abc.__name__ + '_' + str(abs(hash(types))),
tuple(list(bases) + [TypeSpec, NoNormalize]),
{'_abc_type_arguments': types,
'_abc_instance_registry': WeakSet(),
'_abc_name': abc.__name__})
abc._abc_polymorphic_cache[types] = subclass
return abc._abc_polymorphic_cache[types]
class Product:
@classmethod
def _backtrack(cls, value, callback):
return callback(cls, cls._vsn(value))
class Sum:
@classmethod
def _backtrack(cls, value, callback):
for (v, s, n) in cls._vsn(value):
try:
return callback(cls, [(v, s, n)])
except Exception as e:
if isinstance(e, NoBacktrack): raise
raise TypeError('No alternative for {}'.format(cls))
[docs]class Seq(Product):
'''
This describes a sequence of values, all of the same type. For example:
>>> isinstance([1,2,3], Seq(int))
True
>>> isinstance(('four', 'five'), Seq(str))
True
>>> isinstance([1,'two',None], Seq(int))
False
If no type is given, then ``object`` is assumed (which is the same as "anything"):
>>> isinstance([1,'two',None], Seq())
True
'''
_abc_polymorphic_cache = {}
@abstractmethod
def __getitem__(self, index):
raise IndexError
def __new__(cls, *args, **kargs):
if cls is Seq: # check args only when being used as a class factory
if kargs or len(args) > 1:
raise TypeError('Seq requires a single, unnamed argument')
if not args:
args = (object,)
spec = _polymorphic_subclass((cls, Sequence), args, kargs)
if args[0] is not object:
Seq().register(spec)
return spec
else:
return super().__new__(cls, *args, **kargs)
@classmethod
def _vsn(cls, value):
try:
(name, spec) = cls._abc_type_arguments[0]
except AttributeError:
(name, spec) = (None, None)
for v in value:
yield (v, spec, name)
@classmethod
def _fmt_args(cls):
return cls._abc_type_arguments[0][1]
class FmtArgsMixin:
@classmethod
def _fmt_args(cls):
def args():
count = 0
for (name, spec) in cls._abc_type_arguments:
if name == count and count > -1:
count += 1
yield str(spec)
else:
count = -1
yield '{0}={1}'.format(name, spec)
return ','.join(args())
[docs]class Rec(Product, FmtArgsMixin):
'''
This describes `records` - containers with contents that are accessed via a name.
Usually the name is a string:
>>> isinstance({'a':1, 'b':'two'}, Rec(a=int, b=str))
True
But it can also be an integer (unnamed arguments to ``Rec()`` are numbered from 0):
>>> isinstance((1, 'two'), Rec(int, str))
True
Or even arbitrary objects:
>>> foo = object()
>>> isinstance({foo: 1}, Rec(_dict={foo: int}))
True
'''
_abc_polymorphic_cache = {}
class OptKey:
def __init__(self, name=''):
self.name = name
def __repr__(self):
return '__' + str(self.name)
@staticmethod
def unpack(name):
if isinstance(name, Rec.OptKey):
return name.name
else:
return name
@staticmethod
def pack(name):
try:
if name.startswith('__'):
return Rec.OptKey(name[2:])
except AttributeError:
pass
return name
# these are used only for dicts of program arguments
def __eq__(self, other): return str(self) == str(other)
def __ne__(self, other): return str(self) != str(other)
def __le__(self, other): return str(self) <= str(other)
def __ge__(self, other): return str(self) >= str(other)
def __lt__(self, other): return str(self) < str(other)
def __gt__(self, other): return str(self) > str(other)
def __hash__(self): return hash(str(self))
def __str__(self): return '__' + str(self.name)
@abstractmethod
def __getitem__(self, index):
raise IndexError
def __new__(cls, *args, _dict=None, **kargs):
if cls is Rec: # check args only when being used as a class factory
if _dict: kargs.update(_dict)
if not args and not kargs: kargs = {'__': ANY}
# careful to use normalised form here
kargs = dict((Rec.OptKey.pack(name), arg) for (name, arg) in kargs.items())
spec = _polymorphic_subclass((cls, Container), args, kargs)
if args or kargs != {Rec.OptKey(''):ANY}:
Rec().register(spec)
return spec
else:
return super().__new__(cls, *args, **kargs)
@classmethod
def _vsn(cls, value):
try:
names = value.keys()
except AttributeError:
names = range(len(value))
default = None
if hasattr(cls, '_abc_type_arguments'):
names = set(names)
# drop existing and optional names
for (name, spec) in cls._abc_type_arguments:
unpacked = Rec.OptKey.unpack(name)
if unpacked:
try:
yield (value[unpacked], spec, name)
except KeyError:
if not isinstance(name, Rec.OptKey):
raise TypeError('Missing value for {0}'.format(name))
names.discard(unpacked)
else:
default = spec
if names:
if default:
for name in names:
yield (value[name], default, name)
else:
raise TypeError('Additional field(s): {0}'.format(', '.join(names)))
else:
for name in names:
yield (value[name], None, name)
@classmethod
def _to_dict(cls):
return dict(cls._abc_type_arguments)
@classmethod
def _int_keys(cls):
for (name, _) in cls._abc_type_arguments:
if not isinstance(Rec.OptKey.unpack(name), int):
return False
return True
[docs]class Atr(Product, FmtArgsMixin):
'''
This describes the attributes on an object. Methods are not supported (instead,
use `function annotations
<http://docs.python.org/py3k/reference/compound_stmts.html#index-22>`_ on the
method itself::
>>> class Foo:
... def __init__(self, a, b):
... self.a = a
... self.b = b
>>> foo = Foo(1, 'two')
>>> Atr(a=int, b=str).register_instance(foo)
The ``Cls()`` constructor (described below) also has a "shorthand" for defining
classes with attributes:
>>> class Bar: pass
>>> Cls(Bar, a=int, b=str)
And(Cls(Bar),Atr(a=int,b=str))
'''
_abc_polymorphic_cache = {}
@abstractmethod
def __getattr__(self, key):
raise KeyError
def __new__(cls, *args, _dict=None, **kargs):
if cls is Atr: # check args only when being used as a class factory
if _dict: kargs.update(_dict)
if args or not kargs:
raise TypeError('Atr requires named arguments')
spec = _polymorphic_subclass((cls,), (), kargs)
return spec
else:
return super().__new__(cls, *args, **kargs)
@classmethod
def _vsn(cls, value):
if hasattr(cls, '_abc_type_arguments'):
# drop existing and optional names
for (name, spec) in cls._abc_type_arguments:
try:
yield (getattr(value, name), spec, name)
except AttributeError:
raise TypeError('Missing attribute for {0}'.format(name))
else:
for (name, attr) in items(value):
yield (attr, None, name)
[docs]class Alt(Sum, FmtArgsMixin):
'''
This describes a value that can have more that one type. For example,
``Alt(int,str)`` can be an ``int`` or a ``str``::
>>> isinstance(1, Alt(number=int, text=str))
True
>>> isinstance('two', Alt(number=int, text=str))
True
>>> isinstance(3.0, Alt(number=int, text=str))
False
This is like ``Or()`` below, but lets you add a name to the different
alternatives (this name is available during iteration - see below - and
what it means will depend on how the type specification is being used).
'''
# this makes no sense as a mixin - it exists only to specialise the
# functionality provided by the Polymorphic factory above (ie to hold
# the cache, provide class methods, etc)
_abc_polymorphic_cache = {}
def __new__(cls, *args, **kargs):
if cls is Alt: # check args only when being used as a class factory
if (kargs and args) or not (args or kargs):
raise TypeError('Alt requires named or unnamed arguments, but not both')
spec = _polymorphic_subclass((cls,), args, kargs)
return spec
else:
return super().__new__(cls, *args, **kargs)
@classmethod
def _vsn(cls, value):
if hasattr(cls, '_abc_type_arguments'):
for (name, spec) in cls._abc_type_arguments:
yield (value, spec, name)
else:
yield (value, None, None)
try:
raise TypeError('No alternative for {0}'.format(value))
except DelayedTypeError:
raise TypeError('No alternative'.format(value))
@classmethod
def __subclasshook__(cls, subclass):
if cls not in (Alt, Opt) and cls._structuralcheck(subclass, check=issubclass):
return True
else:
return NotImplemented
@classmethod
def _on(cls, value, **choices):
for choice in choices:
for (name, spec) in cls._abc_type_arguments:
if choice == name and isinstance(value, spec):
return choices[choice](value)
raise TypeError('Cannot dispatch {0} on {1}'.format(value, cls))
[docs]class Opt(Alt, NoNormalize):
'''
This describes a common case of ``Alt()`` where the value is either the given
type, or ``None``.
>>> isinstance(1, Opt(int))
True
>>> isinstance(None, Opt(int))
True
>>> issubclass(Opt(int), Alt(value=int,none=type(None)))
True
'''
# defining this as a subclass of Alt, rather than simple function that calls
# Alt just gives a nicer formatting
def __new__(cls, *args, **kargs):
if cls is Opt: # check args only when being used as a class factory
if kargs or len(args) != 1:
raise TypeError('Opt requires a single, unnamed argument')
kargs = {'none':type(None), 'value':args[0]}
spec = _polymorphic_subclass((cls,), (), kargs)
if args[0] != object:
Alt(*kargs).register(spec)
return spec
else:
return super().__new__(cls, *args, **kargs)
@classmethod
def _fmt_args(cls):
return cls._abc_type_arguments[1][1]
[docs]class Cls(Product):
'''
This describes a particular class. You don't need to use it normally (just use
the class itself), but it is used internally::
>>> Seq(int) is Seq(Cls(int))
True
'''
_abc_class_cache = WeakKeyDictionary()
def __new__(cls, class_=object, **kargs):
if class_ not in cls._abc_class_cache:
# TODO - convert to call to type with name
class __Cls(Cls, TypeSpec, NoNormalize):
_abc_instance_registry = WeakSet()
_abc_class = class_
@classmethod
def _vsn(cls, value):
yield (value, cls._abc_class, None)
@classmethod
def __subclasshook__(cls, subclass):
if issubclass(subclass, cls._abc_class):
return True
else:
return NotImplemented
@classmethod
def _reprhook(cls):
if issubclass(cls._abc_class, Atomic):
return cls._abc_class.__name__
else:
return super()._reprhook()
@classmethod
def _fmt_args(cls):
return cls._abc_class.__name__
cls._abc_class_cache[class_] = __Cls
spec = cls._abc_class_cache[class_]
spec._abc_name = Cls.__name__
if class_ is not object:
Cls().register(spec)
if kargs:
return And(spec, Atr(**kargs))
else:
return spec
ANY = Cls()
'''
A useful pre-defined type specification that matches any object. It is the same
as ``Cls(object)`` (which can also be written as ``Cls()``).
'''
[docs]class Sub(ReprBase):
'''
This is like ``Cls()``, but uses ``issubclass()`` rather than ``isinstance()``.
It doesn't make much sense as a type specification, and is arguably an ugly hack,
but it is very useful when using :mod:`dispatch by type pytyp.spec.dispatch`.
'''
_abc_class_cache = WeakKeyDictionary()
def __new__(cls, spec=object):
if spec not in cls._abc_class_cache:
class __Sub(Sub, NoNormalize):
_abc_class = spec
_abc_name = Sub.__name__
@classmethod
@block_recursive_type
def __instancehook__(cls, instance):
return issubclass(instance, cls._abc_class)
@classmethod
def _fmt_args(cls):
return cls._abc_class.__name__
cls._abc_class_cache[spec] = __Sub
spec = cls._abc_class_cache[spec]
return spec
class _Set(TypeSpec):
def __new__(cls, *args, **kargs):
if _Set in cls.__bases__: # check args only when being used as a class factory
if kargs or not args:
raise TypeError('{} requires unnamed arguments'.format(cls.__name__))
args = cls.transitive_ordered(args)
abc = _polymorphic_subclass((cls,), args, {})
abc._set_name = cls.__name__
return abc
else:
return super().__new__(cls, *args, **kargs)
@classmethod
def transitive_ordered(cls, args):
'''
>>> list(And.transitive_ordered([int,And(str,float)]))
[<class 'str'>, <class 'float'>, <class 'int'>]
>>> list(And.transitive_ordered([float,And(int,str)]))
[<class 'str'>, <class 'float'>, <class 'int'>]
'''
def expand(args, flat):
for arg in args:
if isinstance(arg, type) and issubclass(arg, cls) and hasattr(arg, '_abc_type_arguments'):
expand((spec for (_, spec) in arg._abc_type_arguments), flat)
else:
try:
flat.add(arg._abc_class)
except AttributeError:
flat.add(arg)
return flat
return sorted(expand(args, set()), key=id)
@classmethod
def _vsn(cls, value):
if hasattr(cls, '_abc_type_arguments'):
# drop existing and optional names
for (_, spec) in cls._abc_type_arguments:
yield (value, spec, None)
else:
raise TypeError('Cannot expand {}'.format(cls.__name__))
@classmethod
def __subclasshook__(cls, subclass):
if _Set is cls or _Set in cls.__bases__:
return NotImplemented
else:
return cls._structuralcheck(subclass, check=issubclass)
@classmethod
def _fmt_args(cls):
return ','.join(str(spec) for (_, spec) in cls._abc_type_arguments)
[docs]class And(Product, _Set):
'''
This describes something with several different types `at the same time`::
>>> isinstance([1,2,3], And(list, Seq(int)))
True
>>> isinstance((1,2,3), And(list, Seq(int)))
False
>>> isinstance((1,2,3), Seq(int))
True
'''
_abc_polymorphic_cache = {}
[docs]class Or(Sum, _Set):
'''
This describes something that can is one of several different types (and we
don't know which). It is very like ``Alt()`` above, except that the
alternatives cannot be named.
>>> isinstance(1, Or(int, str))
True
>>> isinstance('two', Or(int, str))
True
>>> isinstance(3.0, Or(int, str))
False
'''
_abc_polymorphic_cache = {}
class Delayed(TypeSpec, NoNormalize):
_count = 0
def __new__(cls, *args, **kargs):
cls._count += 1
return type(cls.__name__ + '_' + str(cls._count),
(cls,),
{'_spec': None,
'_defined': False,
'_abc_name': 'Delayed'})
@classmethod
def set(cls, spec):
if cls._defined:
raise DelayedTypeError('Delayed defined')
cls._spec = normalize(spec)
cls._defined = True
@classmethod
def get(cls):
if not cls._defined:
raise DelayedTypeError('Delayed not defined')
return cls._spec
@classmethod
def register(cls, subclass):
return cls.get().register(subclass)
@classmethod
def register_instance(cls, instance):
return cls.get().register_instance(instance)
@classmethod
@block_recursive_type
def __instancehook__(cls, instance):
return cls.get().__instancehook__(instance)
@classmethod
def _vsn(cls, value):
return cls.get()._vsn(value)
@classmethod
def _for_each(cls, value, callback):
return cls.get()._for_each(value, callback)
@classmethod
def _backtrack(cls, value, callback):
return cls.get()._backtrack(value, callback)
@classmethod
def _structuralcheck(cls, instance):
return cls.get()._structuralcheck(instance)
@classmethod
def _fmt_args(cls):
return cls.get()
@classmethod
def _on(cls, value, **choices):
return cls.get()._on(value, **choices)
def copy_registry(abc, target):
for cls in abc.__subclasses__():
if cls not in (target, target()):
target().register(cls)
copy_registry(Sequence, Seq)
Seq().register(tuple)
copy_registry(Mapping, Rec)
Rec().register(tuple)
if __name__ == "__main__":
import doctest
print(doctest.testmod())