source: main/branches/3D/openPLM/plmapp/tasks.py @ 595

Revision 595, 2.0 KB checked in by pcosquer, 8 years ago (diff)

3D branch: merge changes from trunk rev 594

Line 
1###########################
2# adapted from https://github.com/mixcloud/django-celery-haystack-SearchIndex/
3# by sdcooke
4
5from functools import wraps
6
7from django.db.models.loading import get_model
8
9from haystack import site
10
11from celery.task import task
12
13def synchronized(cls=None, lock=None):
14    """Class decorator to synchronize execution of a task's run method.
15
16    This prevents parallel execution of two instances of the same task within
17    the same worker. If an instance of the same task is running in the same
18    worker, the second invocation blocks until the first one completes.
19
20    Note that this applies to the task class, so `@synchronized` should
21    appear before `@task` or `@periodic_task` when tasks are defined with
22    decorators.
23
24    .. code-block:: python
25
26        @synchronized
27        @task
28        def cleanup_database(**kwargs):
29            logger = cleanup_database.get_logger(**kwargs)
30            logger.warn("Task running...")
31    """
32    from multiprocessing import Lock
33    cls.lock = lock or Lock()
34    cls.unsynchronized_run = cls.run
35    @wraps(cls.unsynchronized_run)
36    def wrapper(*args, **kwargs):
37        with cls.lock:
38            cls.unsynchronized_run(*args, **kwargs)
39    cls.run = wrapper
40    return cls
41
42@synchronized
43@task(default_retry_delay = 60, max_retries = 10)
44def update_index(app_name, model_name, pk, **kwargs):
45    model_class = get_model(app_name, model_name)
46    instance = model_class.objects.get(pk=pk)
47    search_index = site.get_index(model_class)
48    search_index.update_object(instance)
49
50@task(default_retry_delay = 60, max_retries = 10)
51def update_indexes(instances):
52    for app_name, model_name, pk in instances:
53        model_class = get_model(app_name, model_name)
54        instance = model_class.objects.get(pk=pk)
55        search_index = site.get_index(model_class)
56        search_index.update_object(instance)
57update_indexes = synchronized(update_indexes, update_index.lock)
58
59@task
60def add(a, b):
61    u"""Simple task, to test the queue ;-)"""
62    return a + b
63
Note: See TracBrowser for help on using the repository browser.