source: main/branches/3D/openPLM/plmapp/tasks.py @ 662

Revision 662, 2.2 KB checked in by pcosquer, 8 years ago (diff)

3D branch: merge changes from trunk (rev [661])

Line 
1###########################
2# adapted from https://github.com/mixcloud/django-celery-haystack-SearchIndex/
3# by sdcooke
4
5from functools import wraps
6
7from django.db.models.loading import get_model
8
9from celery.task import task
10
11def synchronized(cls=None, lock=None):
12    """Class decorator to synchronize execution of a task's run method.
13
14    This prevents parallel execution of two instances of the same task within
15    the same worker. If an instance of the same task is running in the same
16    worker, the second invocation blocks until the first one completes.
17
18    Note that this applies to the task class, so `@synchronized` should
19    appear before `@task` or `@periodic_task` when tasks are defined with
20    decorators.
21
22    .. code-block:: python
23
24        @synchronized
25        @task
26        def cleanup_database(**kwargs):
27            logger = cleanup_database.get_logger(**kwargs)
28            logger.warn("Task running...")
29    """
30    from multiprocessing import Lock
31    cls.lock = lock or Lock()
32    cls.unsynchronized_run = cls.run
33    @wraps(cls.unsynchronized_run)
34    def wrapper(*args, **kwargs):
35        with cls.lock:
36            cls.unsynchronized_run(*args, **kwargs)
37    cls.run = wrapper
38    return cls
39
40@synchronized
41@task(default_retry_delay = 60, max_retries = 10)
42def update_index(app_name, model_name, pk, **kwargs):
43    from haystack import site
44    import openPLM.plmapp.search_indexes
45
46    model_class = get_model(app_name, model_name)
47    instance = model_class.objects.select_related(depth=1).get(pk=pk)
48    search_index = site.get_index(model_class)
49    search_index.update_object(instance)
50
51@task(default_retry_delay = 60, max_retries = 10)
52def update_indexes(instances):
53    from haystack import site
54    import openPLM.plmapp.search_indexes
55
56    for app_name, model_name, pk in instances:
57        model_class = get_model(app_name, model_name)
58        instance = model_class.objects.select_related(depth=1).get(pk=pk)
59        search_index = site.get_index(model_class)
60        search_index.update_object(instance)
61update_indexes = synchronized(update_indexes, update_index.lock)
62
63@task
64def add(a, b):
65    u"""Simple task, to test the queue ;-)"""
66    return a + b
67
Note: See TracBrowser for help on using the repository browser.