# Copyright 2012-2014 Brian May
#
# This file is part of python-tldap.
#
# python-tldap is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# python-tldap is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with python-tldap If not, see <http://www.gnu.org/licenses/>.
""" Contains base class used for tldap objects. """
import six
import tldap
import tldap.options
import tldap.exceptions
import tldap.manager
import tldap.fields
import tldap.modlist
import tldap.dn
import ldap3.core.exceptions
import copy
_default_object_class_field = tldap.fields.CharField(required=True,
max_instances=None)
def subclass_exception(name, parents, module):
return type(name, parents, {'__module__': module})
[docs]class LDAPobject(six.with_metaclass(LDAPmeta)):
""" The base class used for tldap objects. """
schema_list = []
""" Class variable to be overriden for class that provides a list of
schemas to be used. """
@property
def dn(self):
"""Get the current dn."""
return self._dn
@property
def pk(self):
return getattr(self, self._meta.pk)
def __eq__(self, other):
if type(self) != type(other):
return False
if self.pk != other.pk:
return False
return True
def __ne__(self, other):
return not (self == other)
def get_fields(self):
for i in self._meta.get_all_field_names():
yield i, getattr(self, i)
def __init__(self, using=None, settings=None, **kwargs):
if using is None:
using = tldap.DEFAULT_LDAP_ALIAS
self._db_values = None
self._alias = using
self._settings = settings
self._dn = None
self._base_dn = None
self.force_replace = set()
fields = self._meta.fields
for field in fields:
if field.name in kwargs:
value = kwargs.pop(field.name)
else:
value = field.to_python([])
setattr(self, field.name, value)
if 'base_dn' in kwargs:
value = kwargs.pop('base_dn')
setattr(self, '_base_dn', value)
if 'dn' in kwargs:
value = kwargs.pop('dn')
setattr(self, '_dn', value)
for key in kwargs:
raise TypeError(
"'%s' is an invalid keyword argument for this function" % key)
if self._dn is not None and self._base_dn is not None:
raise ValueError("Makes no sense to set both dn and base_dn")
if self._dn is None and self._base_dn is None:
self._base_dn = self._meta.base_dn
@classmethod
[docs] def get_default_base_dn(cls, using, settings):
""" Get the default base_dn for this *class*.
:param cls: This class.
:param using: The LDAP database alias.
:param settings: A set of parameters that may be useful in derived
classes.
:return: Fully qualified base dn. May be None if unsuccessful.
"""
assert using is not None
base_dn = cls._meta.base_dn
if base_dn is None:
key = cls._meta.base_dn_setting
if key is not None:
connection = tldap.connections[using]
if key in connection.settings_dict:
base_dn = connection.settings_dict[key]
return base_dn
def _rdn_to_dn(self, name):
""" Convert the rdn to a fully qualified DN for the specified LDAP
connection.
:param self: rdn belongs to this tldap object.
:param name: rdn to convert.
:return: fully qualified DN.
"""
value = getattr(self, name)
if value is None:
raise tldap.exceptions.ValidationError(
"Cannot use %s in dn as it is None" % name)
if isinstance(value, list):
raise tldap.exceptions.ValidationError(
"Cannot use %s in dn as it is a list" % name)
base_dn = self._base_dn
if base_dn is None:
using = self._alias
assert using is not None
base_dn = self.get_default_base_dn(using, self._settings)
assert base_dn is not None
split_base = tldap.dn.str2dn(base_dn)
split_new_dn = [[(name, value, 1)]] + split_base
new_dn = tldap.dn.dn2str(split_new_dn)
return new_dn
[docs] def save(self, force_add=False, force_modify=False):
"""
Saves the current instance. Override this in a subclass if you want to
control the saving process.
:param self: object to save.
:param force_add: Assume object doesn't already exist and must be
created.
:param force_modify: Assume oobject already exists and must be updated.
"""
# what database should we be using?
using = self._alias
assert using is not None
if self._dn is None and self._meta.pk is not None:
self._dn = self._rdn_to_dn(self._meta.pk)
if self._dn is None:
raise tldap.exceptions.ValidationError(
"Need a full DN for this object")
if force_add and force_modify:
raise ValueError(
"Cannot force both insert and updating in model saving.")
if force_add:
self._add()
elif force_modify:
self._modify()
elif self._db_values is not None:
self._modify()
else:
self._add()
save.alters_data = True
[docs] def delete(self):
""" Delete this object from the LDAP server.
:param self: object to delete.
"""
self._delete()
delete.alters_data = True
def _get_moddict(self):
dn0k, dn0v, _ = tldap.dn.str2dn(self._dn)[0][0]
# objectClass = attribute + class meta setup
default_object_class = getattr(self, "objectClass", [])
default_object_class += list(self._meta.object_classes)
# remove duplicate classes
default_object_class = list(set(default_object_class))
for v in default_object_class:
assert isinstance(v, six.string_types)
# start with an empty dictionary
moddict = {
}
# generate moddict values
fields = self._meta.fields
for field in fields:
name = field.name
value = getattr(self, name)
# if dn attribute not given, try to set it, otherwise just convert
# value
if name.lower() == dn0k.lower():
if isinstance(value, list) and len(value) == 0:
value = [dn0v]
setattr(self, name, value)
elif value is None:
value = dn0v
setattr(self, name, value)
# value_set, set of all values, lowercase
if isinstance(value, list):
value_set = set(v.lower() for v in value)
else:
value_set = set([value.lower()])
# dn attribute must match the dn
if dn0v.lower() not in value_set:
raise ValueError(
"value of %r is %r does not include %r from dn %r" %
(name, value, dn0v, self._dn))
# if objectClass not given, try to set it, otherwise just convert
# value
elif name.lower() == 'objectclass':
assert isinstance(value, list)
value = default_object_class
setattr(self, name, default_object_class)
# db value should always be a list
value = field.to_db(value)
assert isinstance(value, list)
moddict[name] = value
return moddict
def _add(self):
# generate moddict values
moddict = self._get_moddict()
# turn moddict into a modlist
modlist = tldap.modlist.addModlist(moddict)
# what database should we be using?
using = self._alias
assert using is not None
c = tldap.connections[using]
# what to do if transaction is reversed
def onfailure():
self._alias = None
self._db_values = None
# do it
try:
c.add(self._dn, modlist, onfailure)
except ldap3.core.exceptions.LDAPEntryAlreadyExistsResult:
raise self.AlreadyExists(
"Object with dn %r already exists doing add" % (self._dn,))
# save new values
self._alias = using
self._db_values = tldap.helpers.CaseInsensitiveDict(moddict)
_add.alters_data = True
def _modify(self):
fields = self._meta.fields
# what database should we be using?
using = self._alias
assert using is not None
c = tldap.connections[using]
# dictionary of old values
modold = {
}
# generate modold values
fields = self._meta.fields
for field in fields:
name = field.name
modold[name] = self._db_values.get(name, [])
# generate moddict values
moddict = self._get_moddict()
# remove items in force_replace
force_value = {}
for field in self.force_replace:
force_value[field] = moddict[field]
del modold[field]
del moddict[field]
self.force_replace = set()
# turn moddict into a modlist
modlist = tldap.modlist.modifyModlist(modold, moddict)
# FIXME: recheck
# add items in force_replace
force_modlist = {}
for field, value in six.iteritems(force_value):
force_modlist[field] = (
ldap3.MODIFY_REPLACE, tldap.modlist.escape_list(value))
moddict[field] = value
# what to do if transaction is reversed
old_values = self._db_values
def onfailure():
self._db_values = old_values
# do it
if len(modlist) > 0:
try:
c.modify(self._dn, modlist, onfailure)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
raise self.DoesNotExist(
"Object with dn %r doesn't already exist doing modify" %
(self._dn,))
# we can't rollback these values
if len(force_modlist) > 0:
try:
c.modify_no_rollback(self._dn, force_modlist)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
raise self.DoesNotExist(
"Object with dn %r doesn't already exist doing modify" %
(self._dn,))
# save new values
self._db_values = tldap.helpers.CaseInsensitiveDict(moddict)
_modify.alters_data = True
[docs] def rename(self, new_base_dn=None, **kwargs):
"""
Rename this entry. Use like object.rename(uid="new") or
object.rename(cn="new"). Can pass a list in using, as all
connections must be renamed at once.
:param self: object to rename.
:param new_base_dn: move entry to this parent.
:param kwargs: Contains new rdn of object.
"""
# extract key and value from kwargs
if len(kwargs) == 1:
name, value = list(six.iteritems(kwargs))[0]
# replace pk with the real attribute
if name == "pk":
name = self._meta.pk
# work out the new rdn of the object
split_new_rdn = [[(name, value, 1)]]
elif len(kwargs) == 0:
split_new_rdn = [tldap.dn.str2dn(self._dn)[0]]
else:
assert False
new_rdn = tldap.dn.dn2str(split_new_rdn)
# set using if not already set
using = self._alias
assert using is not None
# turn using into a list if it isn't
if not isinstance(using, list):
using = [using]
# what database should we be using?
self._rename(new_rdn, new_base_dn)
# construct new dn
split_dn = tldap.dn.str2dn(self._dn)
tmplist = []
tmplist.append(split_new_rdn[0])
tmplist.extend(split_dn[1:])
self._dn = tldap.dn.dn2str(tmplist)
rename.alters_data = True
def _rename(self, new_rdn, new_base_dn):
"""
Low level rename to new_rdn for the using connection. Works with and
without cached information for the connection. Doesn't update the
dn unless operation is reversed during a commit.
"""
using = self._alias
assert using is not None
c = tldap.connections[using]
# what to do if transaction is reversed
# we need to reset cached data and the dn
old_dn = self._dn
old_values = self._db_values
def onfailure():
self._dn = old_dn
self._db_values = old_values
# do the rename
c.rename(self._dn, new_rdn, new_base_dn, onfailure)
# get old rdn values
split_old_dn = tldap.dn.str2dn(self._dn)
old_key, old_value, _ = split_old_dn[0][0]
# get new rdn values
split_new_rdn = tldap.dn.str2dn(new_rdn)
new_key, new_value, _ = split_new_rdn[0][0]
# make a copy before modifications
self._db_values = copy.deepcopy(self._db_values)
# delete old rdn attribute in object
old_key = self._meta.get_field_name(old_key)
field = self._meta.get_field_by_name(old_key)
v = getattr(self, old_key, [])
if v is None:
pass
elif isinstance(v, list):
if old_value in v:
v.remove(old_value)
elif old_value == v:
v = None
if v is None:
del self._db_values[old_key]
elif isinstance(v, list) and len(v) == 0:
del self._db_values[old_key]
else:
self._db_values[old_key] = field.to_db(v)
setattr(self, old_key, v)
# update new rdn attribute in object
new_key = self._meta.get_field_name(new_key)
field = self._meta.get_field_by_name(new_key)
v = getattr(self, new_key, None)
if v is None:
v = new_value
elif isinstance(v, list):
if new_value not in v:
v.append(new_value)
elif v != new_value:
# we can't add a value to a string
assert False
self._db_values[new_key] = field.to_db(v)
setattr(self, new_key, v)
_rename.alters_data = True
def _delete(self):
# what database should we be using?
using = self._alias
assert using is not None
c = tldap.connections[using]
# what to do if transaction is reversed
old_values = self._db_values
def onfailure():
self._db_values = old_values
# delete it
c.delete(self._dn, onfailure)
self._db_values = None
_delete.alters_data = True
[docs] def unparse(self, ldif_writer, new_dn=None, extra_fields={}):
""" Translate object into ldif.
:param self: object to translate.
:param ldif_writer: ldif_writer to write translation to.
:param extra_fields: extra fields to display
"""
# generate moddict values
moddict = self._get_moddict()
moddict.update(extra_fields)
if new_dn is not None:
dn = new_dn
else:
dn = self.dn
# do stuff
return ldif_writer.unparse(dn, moddict)