## $Id: authentication.py 17528 2023-08-01 06:40:01Z henrik $
##
## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
"""Authentication for Kofa.
"""
import grok
import time
import re
from zope.i18n import translate
from zope.event import notify
from zope.component import getUtility, getUtilitiesFor
from zope.component.interfaces import IFactory
from zope.interface import Interface, implementedBy
from zope.schema import getFields
from zope.securitypolicy.interfaces import (
IPrincipalRoleMap, IPrincipalRoleManager)
from zope.pluggableauth.factories import Principal
from zope.pluggableauth.plugins.session import SessionCredentialsPlugin
from zope.pluggableauth.plugins.httpplugins import (
HTTPBasicAuthCredentialsPlugin)
from zope.pluggableauth.interfaces import (
ICredentialsPlugin, IAuthenticatorPlugin, IAuthenticatedPrincipalFactory,
AuthenticatedPrincipalCreated)
from zope.publisher.interfaces import IRequest
from zope.password.interfaces import IPasswordManager
from zope.securitypolicy.principalrole import principalRoleManager
from waeup.kofa.interfaces import (
ILocalRoleSetEvent, IUserAccount, IAuthPluginUtility, IPasswordValidator,
IKofaPrincipal, IKofaPrincipalInfo, IKofaPluggable, IBatchProcessor,
IGNORE_MARKER, IFailedLoginInfo)
from waeup.kofa.utils.batching import BatchProcessor
from waeup.kofa.permissions import get_all_roles
from waeup.kofa.interfaces import MessageFactory as _
[docs]def setup_authentication(pau):
"""Set up plugguble authentication utility.
Sets up an IAuthenticatorPlugin and
ICredentialsPlugin (for the authentication mechanism)
Then looks for any external utilities that want to modify the PAU.
"""
pau.credentialsPlugins = (
'No Challenge if Authenticated',
'xmlrpc-credentials',
'credentials')
pau.authenticatorPlugins = ('users',)
# Give any third-party code and subpackages a chance to modify the PAU
auth_plugin_utilities = getUtilitiesFor(IAuthPluginUtility)
for name, util in auth_plugin_utilities:
util.register(pau)
[docs]def get_principal_role_manager():
"""Get a role manager for principals.
If we are currently 'in a site', return the role manager for the
portal or the global rolemanager else.
"""
portal = grok.getSite()
if portal is not None:
return IPrincipalRoleManager(portal)
return principalRoleManager
[docs]class KofaSessionCredentialsPlugin(
grok.GlobalUtility, SessionCredentialsPlugin):
"""Session plugin that picks usernames/passwords from fields in webforms.
"""
grok.provides(ICredentialsPlugin)
grok.name('credentials')
loginpagename = 'login'
loginfield = 'form.login'
passwordfield = 'form.password'
[docs]class KofaXMLRPCCredentialsPlugin(
grok.GlobalUtility, HTTPBasicAuthCredentialsPlugin):
"""Plugin that picks useranams/passwords from basic-auth headers.
As XMLRPC requests send/post their authentication credentials in HTTP
basic-auth headers, we need a plugin that can handle this.
This plugin, however, does no challenging. If a user does not provide
basic-auth infos, we will not ask for some. This is correct as we plan to
communicate with machines.
This plugin is planned to be used in "PluggableAuthenitications" registered
with `University` instances.
"""
grok.provides(ICredentialsPlugin)
grok.name('xmlrpc-credentials')
[docs] def challenge(self, request):
"""XMLRPC is for machines. No need to challenge.
"""
return False
[docs] def logout(self, request):
"""Basic auth does not provide any logout possibility.
"""
return False
[docs]class KofaPrincipalInfo(object):
"""An implementation of IKofaPrincipalInfo.
A Kofa principal info is created with id, login, title, description,
phone, email, public_name and user_type.
"""
grok.implements(IKofaPrincipalInfo)
[docs] def __init__(self, id, title, description, email, phone, public_name,
user_type):
self.id = id
self.title = title
self.description = description
self.email = email
self.phone = phone
self.public_name = public_name
self.user_type = user_type
self.credentialsPlugin = None
self.authenticatorPlugin = None
[docs] def __eq__(self, obj):
default = object()
result = []
for name in ('id', 'title', 'description', 'email', 'phone',
'public_name', 'user_type', 'credentialsPlugin',
'authenticatorPlugin'):
result.append(
getattr(self, name) == getattr(obj, name, default))
return False not in result
[docs]class KofaPrincipal(Principal):
"""A portal principal.
Kofa principals provide an extra `email`, `phone`, `public_name`
and `user_type` attribute extending ordinary principals.
"""
grok.implements(IKofaPrincipal)
[docs] def __init__(self, id, title=u'', description=u'', email=u'',
phone=None, public_name=u'', user_type=u'', prefix=None):
self.id = id
if prefix is not None:
self.id = '%s.%s' % (prefix, self.id)
self.title = title
self.description = description
self.groups = []
self.email = email
self.phone = phone
self.public_name = public_name
self.user_type = user_type
[docs] def __repr__(self):
return 'KofaPrincipal(%r)' % self.id
[docs]class AuthenticatedKofaPrincipalFactory(grok.MultiAdapter):
"""Creates 'authenticated' Kofa principals.
Adapts (principal info, request) to a KofaPrincipal instance.
This adapter is used by the standard PAU to transform
KofaPrincipalInfos into KofaPrincipal instances.
"""
grok.adapts(IKofaPrincipalInfo, IRequest)
grok.implements(IAuthenticatedPrincipalFactory)
[docs] def __init__(self, info, request):
self.info = info
self.request = request
[docs] def __call__(self, authentication):
principal = KofaPrincipal(
self.info.id,
self.info.title,
self.info.description,
self.info.email,
self.info.phone,
self.info.public_name,
self.info.user_type,
authentication.prefix,
)
notify(
AuthenticatedPrincipalCreated(
authentication, principal, self.info, self.request))
return principal
[docs]class FailedLoginInfo(grok.Model):
grok.implements(IFailedLoginInfo)
[docs] def __init__(self, num=0, last=None):
self.num = num
self.last = last
return
[docs] def as_tuple(self):
return (self.num, self.last)
[docs] def set_values(self, num=0, last=None):
self.num, self.last = num, last
self._p_changed = True
pass
[docs] def increase(self):
self.set_values(num=self.num + 1, last=time.time())
pass
[docs] def reset(self):
self.set_values(num=0, last=None)
pass
[docs]class Account(grok.Model):
"""Kofa user accounts store infos about a user.
Beside the usual data and an (encrypted) password, accounts also
have a persistent attribute `failed_logins` which is an instance
of `waeup.kofa.authentication.FailedLoginInfo`.
This attribute can be manipulated directly (set new value,
increase values, or reset).
"""
grok.implements(IUserAccount)
[docs] def __init__(self, name, password, title=None, description=None,
email=None, phone=None, public_name=None, roles=[]):
self.name = name
if title is None:
title = name
self.title = title
self.description = description
self.email = email
self.phone = phone
self.public_name = public_name
self.suspended = False
self.setPassword(password)
self.setSiteRolesForPrincipal(roles)
# We don't want to share this dict with other accounts
self._local_roles = dict()
self.failed_logins = FailedLoginInfo()
[docs] def setPassword(self, password):
passwordmanager = getUtility(IPasswordManager, 'SSHA')
self.password = passwordmanager.encodePassword(password)
[docs] def checkPassword(self, password):
try:
blocker = grok.getSite()['configuration'].maintmode_enabled_by
if blocker and blocker != self.name and self.name != 'admin':
return False
except (TypeError, KeyError): # in unit tests
pass
if not isinstance(password, basestring):
return False
if not self.password:
# unset/empty passwords do never match
return False
# Do not accept password if password is insecure.
validator = getUtility(IPasswordValidator)
if validator.validate_secure_password(password, password):
return False
if self.suspended:
return False
passwordmanager = getUtility(IPasswordManager, 'SSHA')
return passwordmanager.checkPassword(self.password, password)
[docs] def getSiteRolesForPrincipal(self):
prm = get_principal_role_manager()
roles = [x[0] for x in prm.getRolesForPrincipal(self.name)
if x[0].startswith('waeup.')]
return roles
[docs] def setSiteRolesForPrincipal(self, roles):
prm = get_principal_role_manager()
old_roles = self.getSiteRolesForPrincipal()
if sorted(old_roles) == sorted(roles):
return
for role in old_roles:
# Remove old roles, not to be set now...
if role.startswith('waeup.') and role not in roles:
prm.unsetRoleForPrincipal(role, self.name)
for role in roles:
# Convert role to ASCII string to be in line with the
# event handler
prm.assignRoleToPrincipal(str(role), self.name)
return
roles = property(getSiteRolesForPrincipal, setSiteRolesForPrincipal)
[docs] def getLocalRoles(self):
return self._local_roles
[docs] def notifyLocalRoleChanged(self, obj, role_id, granted=True):
objects = self._local_roles.get(role_id, [])
if granted and obj not in objects:
objects.append(obj)
if not granted and obj in objects:
objects.remove(obj)
self._local_roles[role_id] = objects
if len(objects) == 0:
del self._local_roles[role_id]
self._p_changed = True
return
[docs]class UserAuthenticatorPlugin(grok.GlobalUtility):
grok.implements(IAuthenticatorPlugin)
grok.provides(IAuthenticatorPlugin)
grok.name('users')
[docs] def authenticateCredentials(self, credentials):
if not isinstance(credentials, dict):
return None
if not ('login' in credentials and 'password' in credentials):
return None
account = self.getAccount(credentials['login'])
if account is None:
return None
# The following shows how 'time penalties' could be enforced
# on failed logins. First three failed logins are 'for
# free'. After that the user has to wait for 1, 2, 4, 8, 16,
# 32, ... seconds before a login can succeed.
# There are, however, some problems to discuss, before we
# really use this in all authenticators.
#num, last = account.failed_logins.as_tuple()
#if (num > 2) and (time.time() < (last + 2**(num-3))):
# # tried login while account still blocked due to previous
# # login errors.
# return None
if not account.checkPassword(credentials['password']):
#account.failed_logins.increase()
return None
return KofaPrincipalInfo(
id=account.name,
title=account.title,
description=account.description,
email=account.email,
phone=account.phone,
public_name=account.public_name,
user_type=u'user')
[docs] def principalInfo(self, id):
account = self.getAccount(id)
if account is None:
return None
return KofaPrincipalInfo(
id=account.name,
title=account.title,
description=account.description,
email=account.email,
phone=account.phone,
public_name=account.public_name,
user_type=u'user')
[docs] def getAccount(self, login):
# ... look up the account object and return it ...
userscontainer = self.getUsersContainer()
if userscontainer is None:
return
return userscontainer.get(login, None)
[docs] def addAccount(self, account):
userscontainer = self.getUsersContainer()
if userscontainer is None:
return
# XXX: complain if name already exists...
userscontainer.addAccount(account)
[docs] def addUser(self, name, password, title=None, description=None):
userscontainer = self.getUsersContainer()
if userscontainer is None:
return
userscontainer.addUser(name, password, title, description)
[docs] def getUsersContainer(self):
site = grok.getSite()
return site['users']
[docs]class PasswordValidator(grok.GlobalUtility):
grok.implements(IPasswordValidator)
[docs] def validate_password(self, pw, pw_repeat):
errors = []
if len(pw) < 6:
errors.append(translate(_(
'Password must have at least 6 characters.')))
if pw != pw_repeat:
errors.append(translate(_('Passwords do not match.')))
return errors
[docs] def validate_secure_password(self, pw, pw_repeat):
"""
^(?=.*[A-Z])(?=.*[a-z])(?=.*[0-9]).{8,}$
^ Start anchor
(?=.*[A-Z]) Ensure password has one uppercase letters.
(?=.*[0-9]) Ensure password has one digit.
(?=.*[a-z]) Ensure password has one lowercase letter.
.{8,} Ensure password is of length 8.
$ End anchor
"""
check_pw = re.compile(r"^(?=.*[A-Z])(?=.*[a-z])(?=.*[0-9]).{8,}$").match
errors = []
if not check_pw(pw):
errors.append(translate(_(
'Passwords must be at least 8 characters long, '
'must contain at least one uppercase letter, '
'one lowercase letter and one digit.')))
if pw != pw_repeat:
errors.append(translate(_('Passwords do not match.')))
return errors
[docs]class LocalRoleSetEvent(object):
grok.implements(ILocalRoleSetEvent)
[docs] def __init__(self, object, role_id, principal_id, granted=True):
self.object = object
self.role_id = role_id
self.principal_id = principal_id
self.granted = granted
@grok.subscribe(IUserAccount, grok.IObjectRemovedEvent)
[docs]def handle_account_removed(account, event):
"""When an account is removed, local and global roles might
have to be deleted.
"""
local_roles = account.getLocalRoles()
principal = account.name
for role_id, object_list in local_roles.items():
for object in object_list:
try:
role_manager = IPrincipalRoleManager(object)
except TypeError:
# No Account object, no role manager, no local roles to remove
continue
role_manager.unsetRoleForPrincipal(role_id, principal)
role_manager = IPrincipalRoleManager(grok.getSite())
roles = account.getSiteRolesForPrincipal()
for role_id in roles:
role_manager.unsetRoleForPrincipal(role_id, principal)
return
@grok.subscribe(IUserAccount, grok.IObjectAddedEvent)
[docs]def handle_account_added(account, event):
"""When an account is added, the local owner role and the global
AcademicsOfficer role must be set.
"""
# We set the local Owner role
role_manager_account = IPrincipalRoleManager(account)
role_manager_account.assignRoleToPrincipal(
'waeup.local.Owner', account.name)
# We set the global AcademicsOfficer role
site = grok.getSite()
role_manager_site = IPrincipalRoleManager(site)
role_manager_site.assignRoleToPrincipal(
'waeup.AcademicsOfficer', account.name)
# Finally we have to notify the user account that the local role
# of the same object has changed
notify(LocalRoleSetEvent(
account, 'waeup.local.Owner', account.name, granted=True))
return
@grok.subscribe(Interface, ILocalRoleSetEvent)
[docs]def handle_local_role_changed(obj, event):
site = grok.getSite()
if site is None:
return
users = site.get('users', None)
if users is None:
return
if event.principal_id not in users.keys():
return
user = users[event.principal_id]
user.notifyLocalRoleChanged(event.object, event.role_id, event.granted)
return
@grok.subscribe(Interface, grok.IObjectRemovedEvent)
[docs]def handle_local_roles_on_obj_removed(obj, event):
try:
role_map = IPrincipalRoleMap(obj)
except TypeError:
# no map, no roles to remove
return
for local_role, user_name, setting in role_map.getPrincipalsAndRoles():
notify(LocalRoleSetEvent(
obj, local_role, user_name, granted=False))
return
[docs]class UserAccountFactory(grok.GlobalUtility):
"""A factory for user accounts.
This factory is only needed for imports.
"""
grok.implements(IFactory)
grok.name(u'waeup.UserAccount')
title = u"Create a user.",
description = u"This factory instantiates new user account instances."
[docs] def __call__(self, *args, **kw):
return Account(name=None, password='')
[docs] def getInterfaces(self):
return implementedBy(Account)
[docs]class UserProcessor(BatchProcessor):
"""The User Processor processes user accounts, i.e. `Account` objects in
the ``users`` container.
The `roles` columns must contain Python list
expressions like ``['waeup.PortalManager', 'waeup.ImportManager']``.
The processor does not import local roles. These can be imported
by means of batch processors in the academic section.
"""
grok.implements(IBatchProcessor)
grok.provides(IBatchProcessor)
grok.context(Interface)
util_name = 'userprocessor'
grok.name(util_name)
name = u'User Processor'
iface = IUserAccount
location_fields = ['name', ]
factory_name = 'waeup.UserAccount'
mode = None
[docs] def parentsExist(self, row, site):
return 'users' in site.keys()
[docs] def entryExists(self, row, site):
return row['name'] in site['users'].keys()
[docs] def getParent(self, row, site):
return site['users']
[docs] def getEntry(self, row, site):
if not self.entryExists(row, site):
return None
parent = self.getParent(row, site)
return parent.get(row['name'])
[docs] def addEntry(self, obj, row, site):
parent = self.getParent(row, site)
parent.addAccount(obj)
return
[docs] def delEntry(self, row, site):
user = self.getEntry(row, site)
if user is not None:
parent = self.getParent(row, site)
grok.getSite().logger.info(
'%s - %s - User removed' % (self.name, row['name']))
del parent[user.name]
pass
[docs] def updateEntry(self, obj, row, site, filename):
"""Update obj to the values given in row.
"""
changed = []
for key, value in row.items():
if key == 'roles':
# We cannot simply set the roles attribute here because
# we can't assure that the name attribute is set before
# the roles attribute is set.
continue
# Skip fields to be ignored.
if value == IGNORE_MARKER:
continue
if not hasattr(obj, key):
continue
setattr(obj, key, value)
changed.append('%s=%s' % (key, value))
roles = row.get('roles', IGNORE_MARKER)
if roles not in ('', IGNORE_MARKER):
evalvalue = eval(roles)
if isinstance(evalvalue, list):
setattr(obj, 'roles', evalvalue)
changed.append('roles=%s' % roles)
# Log actions...
items_changed = ', '.join(changed)
grok.getSite().logger.info(
'%s - %s - %s - updated: %s' % (
self.name, filename, row['name'], items_changed))
return
[docs] def checkConversion(self, row, mode='ignore'):
"""Validates all values in row.
"""
errs, inv_errs, conv_dict = super(
UserProcessor, self).checkConversion(row, mode=mode)
# We need to check if roles exist.
roles = row.get('roles', IGNORE_MARKER)
all_roles = [i[0] for i in get_all_roles()]
if roles not in ('', IGNORE_MARKER):
evalvalue = eval(roles)
for role in evalvalue:
if role not in all_roles:
errs.append(('roles', 'invalid role'))
return errs, inv_errs, conv_dict
[docs]class UsersPlugin(grok.GlobalUtility):
"""A plugin that updates users.
"""
grok.implements(IKofaPluggable)
grok.name('users')
deprecated_attributes = []
[docs] def setup(self, site, name, logger):
return
[docs] def update(self, site, name, logger):
users = site['users']
items = getFields(IUserAccount).items()
for user in users.values():
# Add new attributes
for i in items:
if not hasattr(user, i[0]):
setattr(user, i[0], i[1].missing_value)
logger.info(
'UsersPlugin: %s attribute %s added.' % (
user.name, i[0]))
if not hasattr(user, 'failed_logins'):
# add attribute `failed_logins`...
user.failed_logins = FailedLoginInfo()
logger.info(
'UsersPlugin: attribute failed_logins added.')
# Remove deprecated attributes
for i in self.deprecated_attributes:
try:
delattr(user, i)
logger.info(
'UsersPlugin: %s attribute %s deleted.' % (
user.name, i))
except AttributeError:
pass
return
[docs]class UpdatePAUPlugin(grok.GlobalUtility):
"""A plugin that updates a local PAU.
We insert an 'xmlrpc-credentials' PAU-plugin into a sites PAU if it is not
present already. There must be 'credentials' plugin registered already.
XXX: This Plugin fixes a shortcoming of waeup.kofa 1.5. Sites created or
updated afterwards do not need this plugin and it should be removed.
"""
grok.implements(IKofaPluggable)
grok.name('site-pluggable-auth')
[docs] def setup(self, site, name, logger):
return
[docs] def update(self, site, name, logger):
pau = site.getSiteManager()['PluggableAuthentication']
if 'xmlrpc-credentials' in pau.credentialsPlugins:
return
plugins = list(pau.credentialsPlugins)
plugins.insert(plugins.index('credentials'), 'xmlrpc-credentials')
pau.credentialsPlugins = tuple(plugins)