##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Unique id utility.
This utility assigns unique integer ids to objects and allows lookups
by object and by id.
This functionality can be used in cataloging.
"""
import random
import BTrees
from persistent import Persistent
from zope.component import adapter, getAllUtilitiesRegisteredFor, subscribers
from zope.event import notify
from zope.interface import implementer
from zope.keyreference.interfaces import IKeyReference, NotYet
from zope.lifecycleevent.interfaces import IObjectAddedEvent
from zope.lifecycleevent.interfaces import IObjectRemovedEvent
from zope.location.interfaces import ILocation
from zope.location.interfaces import IContained
from zope.security.proxy import removeSecurityProxy
from zope.intid.interfaces import IIntIds, IIntIdEvent
from zope.intid.interfaces import IntIdAddedEvent, IntIdRemovedEvent
@implementer(IIntIds, IContained)
[docs]class IntIds(Persistent):
"""This utility provides a two way mapping between objects and
integer ids.
IKeyReferences to objects are stored in the indexes.
"""
__parent__ = __name__ = None
_v_nextid = None
_randrange = random.randrange
family = BTrees.family32
def __init__(self, family=None):
if family is not None:
self.family = family
self.ids = self.family.OI.BTree()
self.refs = self.family.IO.BTree()
def __len__(self):
return len(self.ids)
def items(self):
return list(self.refs.items())
def __iter__(self):
return self.refs.iterkeys()
def getObject(self, id):
return self.refs[id]()
def queryObject(self, id, default=None):
r = self.refs.get(id)
if r is not None:
return r()
return default
def getId(self, ob):
try:
key = IKeyReference(ob)
except (NotYet, TypeError, ValueError):
raise KeyError(ob)
try:
return self.ids[key]
except KeyError:
raise KeyError(ob)
def queryId(self, ob, default=None):
try:
return self.getId(ob)
except KeyError:
return default
def _generateId(self):
"""Generate an id which is not yet taken.
This tries to allocate sequential ids so they fall into the
same BTree bucket, and randomizes if it stumbles upon a
used one.
"""
nextid = getattr(self, '_v_nextid', None)
while True:
if nextid is None:
nextid = self._randrange(0, self.family.maxint)
uid = nextid
if uid not in self.refs:
nextid += 1
if nextid > self.family.maxint:
nextid = None
self._v_nextid = nextid
return uid
nextid = None
def register(self, ob):
# Note that we'll still need to keep this proxy removal.
ob = removeSecurityProxy(ob)
key = IKeyReference(ob)
if key in self.ids:
return self.ids[key]
uid = self._generateId()
self.refs[uid] = key
self.ids[key] = uid
return uid
def unregister(self, ob):
# Note that we'll still need to keep this proxy removal.
ob = removeSecurityProxy(ob)
key = IKeyReference(ob, None)
if key is None:
return
uid = self.ids[key]
del self.refs[uid]
del self.ids[key]
@adapter(ILocation, IObjectRemovedEvent)
[docs]def removeIntIdSubscriber(ob, event):
"""A subscriber to ObjectRemovedEvent
Removes the unique ids registered for the object in all the unique
id utilities.
"""
utilities = tuple(getAllUtilitiesRegisteredFor(IIntIds))
if utilities:
key = IKeyReference(ob, None)
# Register only objects that adapt to key reference
if key is not None:
# Notify the catalogs that this object is about to be removed.
notify(IntIdRemovedEvent(ob, event))
for utility in utilities:
try:
utility.unregister(key)
except KeyError:
pass
@adapter(ILocation, IObjectAddedEvent)
[docs]def addIntIdSubscriber(ob, event):
"""A subscriber to ObjectAddedEvent
Registers the object added in all unique id utilities and fires
an event for the catalogs.
"""
utilities = tuple(getAllUtilitiesRegisteredFor(IIntIds))
if utilities: # assert that there are any utilites
key = IKeyReference(ob, None)
# Register only objects that adapt to key reference
if key is not None:
idmap = {}
for utility in utilities:
idmap[utility] = utility.register(key)
# Notify the catalogs that this object was added.
notify(IntIdAddedEvent(ob, event, idmap))
@adapter(IIntIdEvent)
[docs]def intIdEventNotify(event):
"""Event subscriber to dispatch IntIdEvent to interested adapters."""
adapters = subscribers((event.object, event), None)
for adapter in adapters:
pass # getting them does the work