diff --git a/docs/index.rst b/docs/index.rst index ff2046a..da44907 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -268,6 +268,54 @@ of the resource, in this case the blog post:: abort(403) # HTTP Forbidden +Combining Permissions +--------------------- + +While the core constructs provided are sufficient for the common simple use +cases, for more complex cases it is possible to combine existing permissions +through the use of bitwise operators to result in a new permission object that +can do the complex validations. For instance, it is to create a permission +where the identity has the both the `"blog_poster"` or `"blog_reviewer"` role +but not the `"under_probation"` role. Example:: + + blog_admin = Permission(RoleNeed('blog_admin')) + blog_poster = Permission(RoleNeed('blog_poster')) + blog_reviewer = Permission(RoleNeed('blog_reviewer')) + under_probation = Permission(RoleNeed('under_probation')) + + prize_permission = ((blog_poster | blog_reviewer) & ~under_probation) + + @app.route('/blog/prizes') + @prize_permission.require() + def prize_redeem(): + # find out what prizes are available to blog users that are not + # under probation + return render_template('prize_redeem.html') + +Any number of these can be chained, but it is also possible to use the +constructor classes directly to combine these things together:: + + from flask.ext.principal import AndPermission, OrPermission + + allperms = AndPermission(blog_poster, blog_reviewer, blog_admin) + anyperms = OrPermission(blog_poster, blog_reviewer, blog_admin) + +Custom Permissions +------------------ + +Sometimes your permissions may be determined by other circumstances specific to +whatever you need to implement. For that you can create custom classes to +address your needs like so:: + + from flask.ext.principal import BasePermission + + class CustomPermission(BasePermission): + def allows(self, identity): + # Implement other conditions that allow this to pass + return False + +These custom permissions can be combined together via the bitwise operators as +explained above. API === diff --git a/flask_principal.py b/flask_principal.py index b8e2ad9..8135ea7 100644 --- a/flask_principal.py +++ b/flask_principal.py @@ -211,18 +211,10 @@ def __exit__(self, *args): return False -class Permission(object): - """Represents needs, any of which must be present to access a resource +class BasePermission(object): + """The Base Permission.""" - :param needs: The needs for this permission - """ - def __init__(self, *needs): - """A set of needs, any of which must be present in an identity to have - access. - """ - - self.needs = set(needs) - self.excludes = set() + http_exception = None def _bool(self): return bool(self.can()) @@ -237,25 +229,29 @@ def __bool__(self): """ return self._bool() - def __and__(self, other): - """Does the same thing as ``self.union(other)`` + def __or__(self, other): + """See ``OrPermission``. """ - return self.union(other) + return self.or_(other) - def __or__(self, other): - """Does the same thing as ``self.difference(other)`` + def or_(self, other): + return OrPermission(self, other) + + def __and__(self, other): + """See ``AndPermission``. """ - return self.difference(other) + return self.and_(other) - def __contains__(self, other): - """Does the same thing as ``other.issubset(self)``. + def and_(self, other): + return AndPermission(self, other) + + def __invert__(self): + """See ``NotPermission``. """ - return other.issubset(self) + return self.invert() - def __repr__(self): - return '<{0} needs={1} excludes={2}>'.format( - self.__class__.__name__, self.needs, self.excludes - ) + def invert(self): + return NotPermission(self) def require(self, http_exception=None): """Create a principal for this permission. @@ -269,6 +265,10 @@ def require(self, http_exception=None): :param http_exception: the HTTP exception code (403, 401 etc) """ + + if http_exception is None: + http_exception = self.http_exception + return IdentityContext(self, http_exception) def test(self, http_exception=None): @@ -286,6 +286,120 @@ def test(self, http_exception=None): with self.require(http_exception): pass + def allows(self, identity): + """Whether the identity can access this permission. + + :param identity: The identity + """ + + raise NotImplementedError + + def can(self): + """Whether the required context for this permission has access + + This creates an identity context and tests whether it can access this + permission + """ + return self.require().can() + + +class _NaryOperatorPermission(BasePermission): + + def __init__(self, *permissions): + self.permissions = set(permissions) + + +# These classes would be unnecessary if we have predicate calculus +# primatives of some kind. + +class OrPermission(_NaryOperatorPermission): + """Result of bitwise ``or`` of BasePermission""" + + def allows(self, identity): + """ + Checks for any of the nested permission instances that allow the + identity and return True, else return False. + + :param identity: The identity. + """ + + for p in self.permissions: + if p.allows(identity): + return True + return False + + +class AndPermission(_NaryOperatorPermission): + """Result of bitwise ``and`` of BasePermission""" + + def allows(self, identity): + """ + Checks for any of the nested permission instances that disallow + the identity and return False, else return True. + + :param identity: The identity. + """ + + for p in self.permissions: + if not p.allows(identity): + return False + return True + + +class NotPermission(BasePermission): + """ + Result of bitwise ``not`` of BasePermission + + Really could be implemented by returning a transformed result of the + source class of itself, but for the sake of clear presentation I am + not doing that. + """ + + def __init__(self, permission): + self.permission = permission + + def invert(self): + return self.permission + + def allows(self, identity): + return not self.permission.allows(identity) + + +class Permission(BasePermission): + """Represents needs, any of which must be present to access a resource + + :param needs: The needs for this permission + """ + def __init__(self, *needs): + """A set of needs, any of which must be present in an identity to have + access. + """ + + self.needs = set(needs) + self.excludes = set() + + def __or__(self, other): + """Does the same thing as ``self.union(other)`` + """ + if isinstance(other, Permission): + return self.union(other) + return super(Permission, self).__or__(other) + + def __sub__(self, other): + """Does the same thing as ``self.difference(other)`` + """ + return self.difference(other) + + def __contains__(self, other): + """Does the same thing as ``other.issubset(self)``. + """ + return other.issubset(self) + + def __repr__(self): + return '<{0} needs={1} excludes={2}>'.format( + self.__class__.__name__, self.needs, self.excludes + ) + def reverse(self): """ Returns reverse of current state (needs->excludes, excludes->needs) @@ -338,14 +452,6 @@ def allows(self, identity): return True - def can(self): - """Whether the required context for this permission has access - - This creates an identity context and tests whether it can access this - permission - """ - return self.require().can() - class Denial(Permission): """ diff --git a/test_principal.py b/test_principal.py index 885ba03..597efa3 100644 --- a/test_principal.py +++ b/test_principal.py @@ -5,6 +5,8 @@ from flask import Flask, Response +from flask_principal import BasePermission, OrPermission, AndPermission +from flask_principal import NotPermission from flask_principal import Principal, Permission, Denial, RoleNeed, \ PermissionDenied, identity_changed, Identity, identity_loaded @@ -12,12 +14,44 @@ admin_permission = Permission(RoleNeed('admin')) admin_or_editor = Permission(RoleNeed('admin'), RoleNeed('editor')) editor_permission = Permission(RoleNeed('editor')) +manager_permission = Permission(RoleNeed('manager')) +admin_or_editor_or_manager = Permission( + RoleNeed('admin'), RoleNeed('editor'), RoleNeed('manager')) + admin_denied = Denial(RoleNeed('admin')) +class RolenamePermission(BasePermission): + def __init__(self, role): + self.role = role + def allows(self, identity): + return RoleNeed(self.role) in identity.provides + +admin_role_permission = RolenamePermission('admin') +editor_role_permission = RolenamePermission('editor') +manager_role_permission = RolenamePermission('manager') +reviewer_role_permission = RolenamePermission('reviewer') + + def _on_principal_init(sender, identity): - if identity.id == 'ali': - identity.provides.add(RoleNeed('admin')) + role_map = { + 'ali': (RoleNeed('admin'),), + 'admin': (RoleNeed('admin'),), + 'editor': (RoleNeed('editor'),), + 'reviewer': (RoleNeed('reviewer'),), + 'admin_editor': (RoleNeed('editor'), RoleNeed('admin')), + 'manager': (RoleNeed('manager'),), + 'manager_editor': (RoleNeed('editor'), RoleNeed('manager')), + 'reviewer_editor': (RoleNeed('editor'), RoleNeed('reviewer')), + 'admin_manager': (RoleNeed('admin'), RoleNeed('manager')), + 'admin_editor_manager': ( + RoleNeed('admin'), RoleNeed('editor'), RoleNeed('manager')), + } + + roles = role_map.get(identity.id) + if roles: + for role in roles: + identity.provides.add(role) class ReraiseException(Exception): @@ -77,6 +111,256 @@ def f(): with admin_or_editor.require(): return Response('hello') + @app.route('/and_base_fail') + def and_base_fail(): + i = mkadmin() + admin_and_editor_rp = (admin_role_permission & editor_role_permission) + identity_changed.send(app, identity=i) + with admin_and_editor_rp.require(): + return Response('fail') + + @app.route('/and_base_success') + def and_base_success(): + i = Identity('admin_editor') + identity_changed.send(app, identity=i) + # using both formerly default, calling parent __and__ + admin_and_editor_rp = (admin_permission & editor_permission) + with admin_and_editor_rp.require(): + return Response('good') + + @app.route('/and_bunch') + def and_bunch(): + result = [] + + bunch = AndPermission( + admin_role_permission, + editor_role_permission, + manager_role_permission, + ) + + identity_changed.send(app, identity=Identity('admin')) + if bunch.can(): + result.append('bad') + + identity_changed.send(app, identity=Identity('manager')) + if bunch.can(): + result.append('bad') + + identity_changed.send(app, identity=Identity('reviewer')) + if bunch.can(): + result.append('bad') + + identity_changed.send(app, identity=Identity('admin_editor_manager')) + if bunch.can(): + result.append('good') + + return ''.join(result) + + @app.route('/and_mixed1') + def and_mixed1(): + admin_and_editor_mixed = (admin_role_permission & editor_permission) + i = Identity('editor') + identity_changed.send(app, identity=i) + with admin_and_editor_mixed.require(): + return Response('fail') + + @app.route('/and_mixed2') # reversed type of the above. + def and_mixed2(): + admin_and_editor_mixed = (admin_permission & editor_role_permission) + i = Identity('admin_editor') + identity_changed.send(app, identity=i) + with admin_and_editor_mixed.require(): + return Response('good') + + @app.route('/or_base') + def or_base(): + i = mkadmin() + admin_or_editor_rp = (admin_role_permission | editor_role_permission) + identity_changed.send(app, identity=i) + with admin_or_editor_rp.require(): + return Response('hello') + + @app.route('/or_bunch') + def or_bunch(): + result = [] + + bunch = OrPermission( + admin_role_permission, + editor_role_permission, + manager_role_permission, + reviewer_role_permission, + ) + + identity_changed.send(app, identity=Identity('admin')) + if bunch.can(): + result.append('good') + + identity_changed.send(app, identity=Identity('manager')) + if bunch.can(): + result.append('good') + + identity_changed.send(app, identity=Identity('reviewer')) + if bunch.can(): + result.append('good') + + return ''.join(result) + + @app.route('/or_mixed1') + def or_mixed1(): + result = [] + admin_or_editor_mixed = (admin_role_permission | editor_permission) + + i = Identity('admin') + identity_changed.send(app, identity=i) + with admin_or_editor_mixed.require(): + result.append('good') + + i = Identity('editor') + identity_changed.send(app, identity=i) + with admin_or_editor_mixed.require(): + result.append('good') + + return Response(''.join(result)) + + @app.route('/or_mixed2') # reversed type of the above. + def or_mixed2(): + result = [] + admin_or_editor_mixed = (admin_permission | editor_role_permission) + + i = Identity('admin') + identity_changed.send(app, identity=i) + with admin_or_editor_mixed.require(): + result.append('good') + + i = Identity('editor') + identity_changed.send(app, identity=i) + with admin_or_editor_mixed.require(): + result.append('good') + + return Response(''.join(result)) + + @app.route('/not_base') + def not_base(): + result = [] + not_admin_perm = ~admin_role_permission + + identity_changed.send(app, identity=Identity('admin')) + if not_admin_perm.can(): + result.append('admin') + + identity_changed.send(app, identity=Identity('editor')) + if not_admin_perm.can(): + result.append('editor') + + identity_changed.send(app, identity=Identity('admin_manager')) + if not_admin_perm.can(): + result.append('admin_manager') + + return Response(''.join(result)) + + @app.route('/mixed_ops_fail') + def mixed_ops_fail(): + mixed_perms = (admin_permission | manager_permission | + (reviewer_role_permission & editor_role_permission)) + + i = Identity('editor') + identity_changed.send(app, identity=i) + with mixed_perms.require(): + return Response('fail') + + @app.route('/mixed_ops1') + def mixed_ops1(): + result = [] + mixed_perms = (admin_permission | manager_permission | + (reviewer_role_permission & editor_role_permission)) + + i = Identity('reviewer_editor') + identity_changed.send(app, identity=i) + with mixed_perms.require(): + result.append('good') + + i = Identity('manager') + identity_changed.send(app, identity=i) + with mixed_perms.require(): + result.append('good') + + i = Identity('admin') + identity_changed.send(app, identity=i) + with mixed_perms.require(): + result.append('good') + + return Response(''.join(result)) + + @app.route('/mixed_ops2') + def mixed_ops2(): + result = [] + mixed_perms = ((admin_permission & editor_permission) | + (manager_role_permission & editor_role_permission)) + + i = Identity('manager_editor') + identity_changed.send(app, identity=i) + if mixed_perms.can(): + result.append('good') + + i = Identity('manager') + identity_changed.send(app, identity=i) + if mixed_perms.can(): + result.append('bad') + + i = Identity('editor') + identity_changed.send(app, identity=i) + if mixed_perms.can(): + result.append('bad') + + i = Identity('admin_editor') + identity_changed.send(app, identity=i) + if mixed_perms.can(): + result.append('good') + + i = Identity('admin') + identity_changed.send(app, identity=i) + if mixed_perms.can(): + result.append('bad') + + return Response(''.join(result)) + + @app.route('/mixed_ops3') + def mixed_ops3(): + result = [] + mixed_perms = ( + ((admin_permission & editor_permission) | + (manager_role_permission & editor_role_permission)) & + ~(manager_role_permission & admin_permission) & + ~reviewer_role_permission + ) + + i = Identity('manager_editor') + identity_changed.send(app, identity=i) + if mixed_perms.can(): + result.append('good') + + i = Identity('admin_editor') + identity_changed.send(app, identity=i) + if mixed_perms.can(): + result.append('good') + + i = Identity('admin_manager') + identity_changed.send(app, identity=i) + if mixed_perms.can(): + result.append('bad') + + i = Identity('manager_editor_admin') + identity_changed.send(app, identity=i) + if mixed_perms.can(): + result.append('bad') + + i = Identity('reviewer') + identity_changed.send(app, identity=i) + if mixed_perms.can(): + result.append('bad') + + return Response(''.join(result)) + @app.route('/g') @admin_permission.require() @editor_permission.require() @@ -108,12 +392,16 @@ def k(): def l(): s = [] if not admin_or_editor: - s.append("not admin") + s.append("not admin_or_editor") + if not (admin_permission or editor_permission): + s.append("not (admin or editor)") i = Identity('ali') identity_changed.send(app, identity=i) if admin_or_editor: - s.append("now admin") + s.append("now admin_or_editor") + if admin_permission or editor_permission: + s.append("now admin or editor") return Response('\n'.join(s)) @app.route("/m") @@ -137,6 +425,18 @@ def o(): admin_or_editor.test() return Response("OK") + @app.route("/o2") + def o2(): + (admin_permission | editor_permission).test() + return Response("OK") + + @app.route("/o3") + def o3(): + i = mkadmin() + identity_changed.send(app, identity=i) + (admin_permission | editor_permission).test() + return Response("OK") + @app.route("/p") def p(): admin_or_editor.test(404) @@ -150,6 +450,23 @@ def mkadmin(): return i +class BasePermissionUnitTests(unittest.TestCase): + + def test_or_permission(self): + admin_or_editor_rp = (admin_role_permission | editor_role_permission) + self.assertTrue(isinstance(admin_or_editor_rp, OrPermission)) + self.assertEqual(admin_or_editor_rp.permissions, + set([admin_role_permission, editor_role_permission])) + + def test_and_permission(self): + admin_and_editor_rp = (admin_role_permission & editor_role_permission) + self.assertTrue(isinstance(admin_and_editor_rp, AndPermission)) + self.assertEqual(admin_and_editor_rp.permissions, + set([admin_role_permission, editor_role_permission])) + + # TODO test manual construction + + class PrincipalUnitTests(unittest.TestCase): def test_permission_union(self): @@ -187,23 +504,67 @@ def test_reverse_permission(self): d = p.reverse() assert ('a', 'b') in d.excludes - def test_permission_and(self): + def test_permission_difference(self): p1 = Permission(RoleNeed('boss')) p2 = Permission(RoleNeed('lackey')) - p3 = p1 & p2 - p4 = p1.union(p2) + p3 = p1 - p2 + p4 = p1.difference(p2) + + # parity with set operations + p3needs = p1.needs - p2.needs assert p3.needs == p4.needs + assert p3.needs == p3needs + + def test_permission_difference_excludes(self): + p1 = Permission(RoleNeed('boss')).reverse() + p2 = Permission(RoleNeed('lackey')).reverse() + + p3 = p1 - p2 + p4 = p1.difference(p2) + + # parity with set operations + p3excludes = p1.excludes - p2.excludes + + assert p3.excludes == p4.excludes + assert p3.excludes == p3excludes def test_permission_or(self): p1 = Permission(RoleNeed('boss'), RoleNeed('lackey')) p2 = Permission(RoleNeed('lackey'), RoleNeed('underling')) p3 = p1 | p2 - p4 = p1.difference(p2) + p4 = p1.union(p2) + + # Ensure that an `or` between sets also result in the expected + # behavior. As expected, as "any of which must be present to + # access a resource". + p3needs = p1.needs | p2.needs assert p3.needs == p4.needs + assert p3.needs == p3needs + + def test_permission_or_excludes(self): + p1 = Permission(RoleNeed('boss'), RoleNeed('lackey')).reverse() + p2 = Permission(RoleNeed('lackey'), RoleNeed('underling')).reverse() + + p3 = p1 | p2 + p4 = p1.union(p2) + + # Ensure that an `or` between sets also result in the expected + # behavior. As expected, as "any of which must be present to + # access a resource". + p3excludes = p1.excludes | p2.excludes + + assert p3.excludes == p4.excludes + assert p3.excludes == p3excludes + + def test_permission_not(self): + p1 = Permission(RoleNeed('boss'), RoleNeed('lackey')) + p2 = ~p1 + p3 = ~p2 + assert p3 == p1 def test_contains(self): p1 = Permission(RoleNeed('boss'), RoleNeed('lackey')) @@ -244,6 +605,36 @@ def test_or_permissions(self): assert self.client.open('/e').data == b'hello' assert self.client.open('/f').data == b'hello' + def test_base_or_permissions(self): + assert self.client.open('/or_base').data == b'hello' + + def test_or_permissions_bunch(self): + self.assertEqual(self.client.open('/or_bunch').data, b'goodgoodgood') + + def test_base_not_permissions(self): + self.assertEqual(self.client.open('/not_base').data, b'editor') + + def test_mixed_or_permissions(self): + assert self.client.open('/or_mixed1').data == b'goodgood' + assert self.client.open('/or_mixed2').data == b'goodgood' + + def test_base_and_permissions(self): + self.assertRaises(PermissionDenied, self.client.open, '/and_base_fail') + self.assertEqual(self.client.open('/and_base_success').data, b'good') + + def test_mixed_and_permissions(self): + self.assertRaises(PermissionDenied, self.client.open, '/and_mixed1') + self.assertEqual(self.client.open('/and_mixed2').data, b'good') + + def test_mixed_and_or_permissions_fail(self): + self.assertRaises(PermissionDenied, + self.client.open, '/mixed_ops_fail') + + def test_mixed_and_or_permissions(self): + self.assertEqual(self.client.open('/mixed_ops1').data, b'goodgoodgood') + self.assertEqual(self.client.open('/mixed_ops2').data, b'goodgood') + self.assertEqual(self.client.open('/mixed_ops3').data, b'goodgood') + def test_and_permissions_view_denied(self): self.assertRaises(PermissionDenied, self.client.open, '/g') @@ -258,6 +649,9 @@ def test_and_permissions_view_with_http_exc_decorated(self): response = self.client.open("/k") assert response.status_code == 403 + def test_and_permissions_bunch(self): + self.assertEqual(self.client.open('/and_bunch').data, b'good') + def test_and_permissions_view_with_custom_errhandler(self): app = mkapp() @@ -273,8 +667,10 @@ def handle_permission_denied(error): def test_permission_bool(self): response = self.client.open('/l') assert response.status_code == 200 - assert b'not admin' in response.data - assert b'now admin' in response.data + assert b'not admin_or_editor' in response.data + assert b'not (admin or editor)' in response.data + assert b'now admin_or_editor' in response.data + assert b'now admin or editor' in response.data def test_denied_passes(self): response = self.client.open("/m") @@ -286,6 +682,13 @@ def test_denied_fails(self): def test_permission_test(self): self.assertRaises(PermissionDenied, self.client.open, '/o') + def test_permission_operator_test(self): + self.assertRaises(PermissionDenied, self.client.open, '/o2') + + response = self.client.open('/o3') + assert response.status_code == 200 + assert response.data == b'OK' + def test_permission_test_with_http_exc(self): response = self.client.open("/p") assert response.status_code == 404