diff --git a/src/flask_principal.py b/src/flask_principal.py index c60a6eb..3d2b82e 100644 --- a/src/flask_principal.py +++ b/src/flask_principal.py @@ -11,6 +11,9 @@ """ from __future__ import with_statement +from typing import Optional +from typing import Callable +import typing_extensions __version__ = '0.4.0' @@ -21,8 +24,9 @@ from collections import namedtuple -from flask import g, session, current_app, abort, request +from flask import g, session, current_app, abort, request, Response from flask.signals import Namespace +import werkzeug PY3 = sys.version_info[0] == 3 @@ -139,19 +143,19 @@ class Identity(object): Needs that are provided by this identity should be added to the `provides` set after loading. """ - def __init__(self, id, auth_type=None): + def __init__(self, id: Optional[type[str | int]], auth_type: Optional[type[str | None]]=None) -> None: self.id = id self.auth_type = auth_type - self.provides = set() + self.provides: set[Need] = set() - def can(self, permission): + def can(self, permission: Permission) -> bool: """Whether the identity has access to the permission. :param permission: The permission to test provision for. """ return permission.allows(self) - def __repr__(self): + def __repr__(self) -> str: return '<{0} id="{1}" auth_type="{2}" provides={3}>'.format( self.__class__.__name__, self.id, self.auth_type, self.provides ) @@ -160,11 +164,11 @@ def __repr__(self): class AnonymousIdentity(Identity): """An anonymous identity""" - def __init__(self): + def __init__(self) -> None: Identity.__init__(self, None) -class IdentityContext(object): +class IdentityContext: """The context of an identity for a permission. .. note:: The principal is usually created by the flask_principal.Permission.require method @@ -175,19 +179,19 @@ class IdentityContext(object): flow is continued (context manager) or the function is executed (decorator). """ - def __init__(self, permission, http_exception=None): + def __init__(self, permission: Permission, http_exception: Optional[int | werkzeug.Response]=None) -> None: self.permission = permission self.http_exception = http_exception """The permission of this principal """ @property - def identity(self): + def identity(self) -> Identity: """The identity of this principal """ return g.identity - def can(self): + def can(self) -> bool: """Whether the identity has access to the permission """ return self.identity.can(self.permission) @@ -200,38 +204,38 @@ def _decorated(*args, **kw): return rv return _decorated - def __enter__(self): + def __enter__(self) -> None: # check the permission here if not self.can(): if self.http_exception: abort(self.http_exception, self.permission) raise PermissionDenied(self.permission) - def __exit__(self, *args): + def __exit__(self, *args) -> typing_extensions.Literal[False]: return False -class Permission(object): +class Permission: """Represents needs, any of which must be present to access a resource :param needs: The needs for this permission """ - def __init__(self, *needs): + def __init__(self, *needs: set[Need]) -> None: """A set of needs, any of which must be present in an identity to have access. """ self.perms = {n: True for n in needs} - def _bool(self): + def _bool(self) -> bool: return bool(self.can()) - def __nonzero__(self): + def __nonzero__(self) -> bool: """Equivalent to ``self.can()``. """ return self._bool() - def __bool__(self): + def __bool__(self) -> bool: """Equivalent to ``self.can()``. """ return self._bool() @@ -341,7 +345,7 @@ def issubset(self, other): self.excludes.issubset(other.excludes) ) - def allows(self, identity): + def allows(self, identity: Identity) -> bool: """Whether the identity can access this permission. :param identity: The identity @@ -368,7 +372,7 @@ class Denial(Permission): Shortcut class for passing excluded needs. """ - def __init__(self, *excludes): + def __init__(self, *excludes) -> None: self.perms = {e: False for e in excludes} @@ -379,7 +383,7 @@ def session_identity_loader(): return identity -def session_identity_saver(identity): +def session_identity_saver(identity) -> None: session['identity.id'] = identity.id session['identity.auth_type'] = identity.auth_type session.modified = True @@ -393,7 +397,7 @@ class Principal(object): identification. :param skip_static: Whether to ignore static endpoints. """ - def __init__(self, app=None, use_sessions=True, skip_static=False): + def __init__(self, app=None, use_sessions=True, skip_static=False) -> None: self.identity_loaders = deque() self.identity_savers = deque() # XXX This will probably vanish for a better API @@ -403,7 +407,7 @@ def __init__(self, app=None, use_sessions=True, skip_static=False): if app is not None: self.init_app(app) - def _init_app(self, app): + def _init_app(self, app) -> None: from warnings import warn warn(DeprecationWarning( '_init_app is deprecated, use the new init_app ' @@ -411,7 +415,7 @@ def _init_app(self, app): ) self.init_app(app) - def init_app(self, app): + def init_app(self, app) -> None: if hasattr(app, 'static_url_path'): self._static_path = app.static_url_path else: @@ -424,7 +428,7 @@ def init_app(self, app): self.identity_loader(session_identity_loader) self.identity_saver(session_identity_saver) - def set_identity(self, identity): + def set_identity(self, identity) -> None: """Set the current identity. :param identity: The identity to set @@ -472,18 +476,18 @@ def save_identity_to_weird_usecase(identity): self.identity_savers.appendleft(f) return f - def _set_thread_identity(self, identity): + def _set_thread_identity(self, identity) -> None: g.identity = identity identity_loaded.send(current_app._get_current_object(), identity=identity) - def _on_identity_changed(self, app, identity): + def _on_identity_changed(self, app, identity) -> None: if self._is_static_route(): return self.set_identity(identity) - def _on_before_request(self): + def _on_before_request(self) -> None: if self._is_static_route(): return