Message398680
Antti Haapala, I agree that this situation is catastrophic and that we need some way to avoid blocking parallel calculations of cached values for distinct instances of the same class.
Here's an idea that might possibly work. Perhaps, hold one lock only briefly to atomically test and set a variable to track which instances are actively being updated.
If another thread is performing the update, use a separate condition condition variable to wait for the update to complete.
If no other thread is doing the update, we don't need to hold a lock while performing the I/O bound underlying function. And when we're done updating this specific instance, atomically update the set of instances being actively updated and notify threads waiting on the condition variable to wake-up.
The key idea is to hold the lock only for variable updates (which are fast) rather than for the duration of the underlying function call (which is slow). Only when this specific instance is being updated do we use a separate lock (wrapped in a condition variable) to block until the slow function call is complete.
The logic is hairy, so I've added Serhiy and Tim to the nosy list to help think it through.
--- Untested sketch ---------------------------------
class cached_property:
def __init__(self, func):
self.update_lock = RLock()
self.instances_other_thread_is_updating = {}
self.cv = Condition(RLock())
...
def __get__(self, instance, owner=None):
if instance is None:
return self
if self.attrname is None:
raise TypeError(
"Cannot use cached_property instance without calling __set_name__ on it.")
try:
cache = instance.__dict__
except AttributeError: # not all objects have __dict__ (e.g. class defines slots)
msg = (
f"No '__dict__' attribute on {type(instance).__name__!r} "
f"instance to cache {self.attrname!r} property."
)
raise TypeError(msg) from None
val = cache.get(self.attrname, _NOT_FOUND)
if val is not _NOT_FOUND:
return val
# Now we need to either call the function or wait for another thread to do it
with self.update_lock:
# Invariant: no more than one thread can report
# that the instance is actively being updated
other_thread_is_updating = instance in instance_being_updated
if other_thread_is_updating:
instance_being_updated.add(instance)
# ONLY if this is the EXACT instance being updated
# will we block and wait for the computed result.
# Other instances won't have to wait
if other_thread_is_updating:
with self.cv:
while instance in instance_being_updated:
self.cv.wait()
return cache[self.attrname]
# Let this thread do the update in this thread
val = self.func(instance)
try:
cache[self.attrname] = val
except TypeError:
msg = (
f"The '__dict__' attribute on {type(instance).__name__!r} instance "
f"does not support item assignment for caching {self.attrname!r} property."
)
raise TypeError(msg) from None
with self.update_lock:
instance_being_updated.discard(instance)
self.cv.notify_all()
return val
# XXX What to do about waiting threads when an exception occurs?
# We don't want them to hang. Should they repeat the underlying
# function call to get their own exception to propagate upwards? |
|
Date |
User |
Action |
Args |
2021-08-01 03:36:05 | rhettinger | set | recipients:
+ rhettinger, tim.peters, serhiy.storchaka, ztane, graingert |
2021-08-01 03:36:05 | rhettinger | set | messageid: <1627788965.3.0.0866461024078.issue43468@roundup.psfhosted.org> |
2021-08-01 03:36:05 | rhettinger | link | issue43468 messages |
2021-08-01 03:36:04 | rhettinger | create | |
|