source: main/branches/1.0/openPLM/plmapp/controllers/plmobject.py @ 1114

Revision 1114, 21.8 KB checked in by pcosquer, 8 years ago (diff)

fixes #122

Line 
1############################################################################
2# openPLM - open source PLM
3# Copyright 2010 Philippe Joulaud, Pierre Cosquer
4#
5# This file is part of openPLM.
6#
7#    openPLM is free software: you can redistribute it and/or modify
8#    it under the terms of the GNU General Public License as published by
9#    the Free Software Foundation, either version 3 of the License, or
10#    (at your option) any later version.
11#
12#    openPLM is distributed in the hope that it will be useful,
13#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15#    GNU General Public License for more details.
16#
17#    You should have received a copy of the GNU General Public License
18#    along with openPLM.  If not, see <http://www.gnu.org/licenses/>.
19#
20# Contact :
21#    Philippe Joulaud : ninoo.fr@gmail.com
22#    Pierre Cosquer : pierre.cosquer@insa-rennes.fr
23################################################################################
24
25"""
26"""
27
28import datetime
29import re
30
31from django.conf import settings
32from django.core.exceptions import ObjectDoesNotExist
33
34import openPLM.plmapp.models as models
35from openPLM.plmapp.exceptions import RevisionError, PermissionError,\
36    PromotionError
37from openPLM.plmapp.utils import level_to_sign_str
38from openPLM.plmapp.controllers.base import Controller
39
40rx_bad_ref = re.compile(r"[?/#\n\t\r\f]|\.\.")
41class PLMObjectController(Controller):
42    u"""
43    Object used to manage a :class:`~plmapp.models.PLMObject` and store his
44    modification in an history
45   
46    :attributes:
47        .. attribute:: object
48
49            The :class:`.PLMObject` managed by the controller
50        .. attribute:: _user
51
52            :class:`~django.contrib.auth.models.User` who modifies ``object``
53
54    :param obj: managed object
55    :type obj: a subinstance of :class:`.PLMObject`
56    :param user: user who modifies *obj*
57    :type user: :class:`~django.contrib.auth.models.User`
58    """
59
60    HISTORY = models.History
61
62    @classmethod
63    def create(cls, reference, type, revision, user, data={}, block_mails=False,
64            no_index=False):
65        u"""
66        This method builds a new :class:`.PLMObject` of
67        type *class_* and return a :class:`PLMObjectController` associated to
68        the created object.
69
70        Raises :exc:`ValueError` if *reference*, *type* or *revision* are
71        empty. Raises :exc:`ValueError` if *type* is not valid.
72
73        :param reference: reference of the objet
74        :param type: type of the object
75        :param revision: revision of the object
76        :param user: user who creates/owns the object
77        :param data: a dict<key, value> with informations to add to the plmobject
78        :rtype: :class:`PLMObjectController`
79        """
80       
81        profile = user.get_profile()
82        if not (profile.is_contributor or profile.is_administrator):
83            raise PermissionError("%s is not a contributor" % user)
84        if not reference or not type or not revision:
85            raise ValueError("Empty value not permitted for reference/type/revision")
86        if rx_bad_ref.search(reference) or rx_bad_ref.search(revision):
87            raise ValueError("Reference or revision contains a '/' or a '..'")
88        try:
89            class_ = models.get_all_plmobjects()[type]
90        except KeyError:
91            raise ValueError("Incorrect type")
92        # create an object
93        obj = class_(reference=reference, type=type, revision=revision,
94                     owner=user, creator=user)
95        if no_index:
96            obj.no_index = True
97        if data:
98            for key, value in data.iteritems():
99                if key not in ["reference", "type", "revision"]:
100                    setattr(obj, key, value)
101        obj.state = models.get_default_state(obj.lifecycle)
102        obj.save()
103        res = cls(obj, user)
104        if block_mails:
105            res.block_mails()
106        # record creation in history
107        infos = {"type" : type, "reference" : reference, "revision" : revision}
108        infos.update(data)
109        details = u",".join(u"%s : %s" % (k, v) for k, v in infos.items())
110        res._save_histo("Create", details)
111        # add links
112        models.PLMObjectUserLink.objects.create(plmobject=obj, user=user, role="owner")
113        try:
114            l = models.DelegationLink.objects.get(delegatee=user,
115                    role=models.ROLE_SPONSOR)
116            sponsor = l.delegator
117            if sponsor.username == settings.COMPANY:
118                sponsor = user
119            if not res.check_in_group(sponsor, False):
120                sponsor = user
121        except models.DelegationLink.DoesNotExist:
122            sponsor = user
123        # the user can promote to the next state
124        models.PLMObjectUserLink.objects.create(plmobject=obj, user=user,
125                                                 role=level_to_sign_str(0))
126        # from the next state, only the sponsor can promote this object
127        for i in range(1, obj.lifecycle.nb_states - 1):
128            models.PLMObjectUserLink.objects.create(plmobject=obj, user=sponsor,
129                                                    role=level_to_sign_str(i))
130
131        res._update_state_history()
132        return res
133       
134    @classmethod
135    def create_from_form(cls, form, user, block_mails=False, no_index=False):
136        u"""
137        Creates a :class:`PLMObjectController` from *form* and associates *user*
138        as the creator/owner of the PLMObject.
139       
140        This method raises :exc:`ValueError` if *form* is invalid.
141
142        :param form: a django form associated to a model
143        :param user: user who creates/owns the object
144        :rtype: :class:`PLMObjectController`
145        """
146        if form.is_valid():
147            ref = form.cleaned_data["reference"]
148            type = form.Meta.model.__name__
149            rev = form.cleaned_data["revision"]
150            obj = cls.create(ref, type, rev, user, form.cleaned_data,
151                    block_mails, no_index)
152            return obj
153        else:
154            raise ValueError("form is invalid")
155   
156    def promote(self):
157        u"""
158        Promotes :attr:`object` in his lifecycle and writes his promotion in
159        the history
160       
161        :raise: :exc:`.PromotionError` if :attr:`object` is not promotable
162        :raise: :exc:`.PermissionError` if the use can not sign :attr:`object`
163        """
164        if self.object.is_promotable():
165            state = self.object.state
166            lifecycle = self.object.lifecycle
167            lcl = lifecycle.to_states_list()
168            self.check_permission(level_to_sign_str(lcl.index(state.name)))
169            try:
170                new_state = lcl.next_state(state.name)
171                self.object.state = models.State.objects.get_or_create(name=new_state)[0]
172                self.object.save()
173                details = "change state from %(first)s to %(second)s" % \
174                                     {"first" :state.name, "second" : new_state}
175                self._save_histo("Promote", details, roles=["sign_"])
176                if self.object.state == lifecycle.official_state:
177                    self._officialize()
178                self._update_state_history()
179            except IndexError:
180                # FIXME raises it ?
181                pass
182        else:
183            raise PromotionError()
184
185    def _officialize(self):
186        """ Officialize the object (called by :meth:`promote`)."""
187        # changes the owner to the company
188        cie = models.User.objects.get(username=settings.COMPANY)
189        self.set_owner(cie, True)
190        for rev in self.get_previous_revisions():
191            if rev.is_cancelled:
192                # nothing to do
193                pass
194            else:
195                if rev.is_editable:
196                    ctrl = type(self)(rev.get_leaf_object(), self._user)
197                    ctrl.cancel()
198                elif rev.is_official:
199                    ctrl = type(self)(rev.get_leaf_object(), self._user)
200                    ctrl._deprecate()
201
202    def _update_state_history(self):
203        """ Updates the :class:`.StateHistory` table of the object."""
204        now = datetime.datetime.now()
205        try:
206            # ends previous StateHistory if it exists
207            # here we do not try to see if the state has not changed since
208            # we are sure it is not the case and it would not be a problem
209            # if it has not changed
210            sh = models.StateHistory.objects.get(plmobject__id=self.object.id,
211                    end_time=None)
212            sh.end_time = now
213            sh.save()
214        except models.StateHistory.DoesNotExist:
215            pass
216        models.StateHistory.objects.create(plmobject=self.object,
217                start_time=now, end_time=None, state=self.state,
218                lifecycle=self.lifecycle)
219
220    def _deprecate(self):
221        """ Deprecate the object. """
222        cie = models.User.objects.get(username=settings.COMPANY)
223        self.state = self.lifecycle.last_state
224        self.set_owner(cie, True)
225        self.save()
226        self._update_state_history()
227
228    def demote(self):
229        u"""
230        Demotes :attr:`object` in his lifecycle and writes his demotion in the
231        history
232       
233        :raise: :exc:`.PermissionError` if the use can not sign :attr:`object`
234        """
235        if not self.is_proposed:
236            raise PromotionError()
237        state = self.object.state
238        lifecycle = self.object.lifecycle
239        lcl = lifecycle.to_states_list()
240        try:
241            new_state = lcl.previous_state(state.name)
242            self.check_permission(level_to_sign_str(lcl.index(new_state)))
243            self.object.state = models.State.objects.get_or_create(name=new_state)[0]
244            self.object.save()
245            details = "change state from %(first)s to %(second)s" % \
246                    {"first" :state.name, "second" : new_state}
247            self._save_histo("Demote", details, roles=["sign_"])
248            self._update_state_history()
249        except IndexError:
250            # FIXME raises it ?
251            pass
252
253    def _save_histo(self, action, details, blacklist=(), roles=(), users=()):
254        """
255        Records *action* with details *details* made by :attr:`_user` in
256        on :attr:`object` in the histories table.
257
258        *blacklist*, if given, should be a list of email whose no mail should
259        be sent (empty by default).
260
261        A mail is sent to all notified users. Moreover, more roles can be
262        notified by settings the *roles" argument.
263        """
264        roles = ["notified"] + list(roles)
265        super(PLMObjectController, self)._save_histo(action, details,
266                blacklist, roles, users)
267
268    def has_permission(self, role):
269        if role == models.ROLE_OWNER and self.owner == self._user:
270            return True
271        if self.plmobjectuserlink_plmobject.filter(user=self._user, role=role).exists():
272            return True
273
274        users = models.DelegationLink.get_delegators(self._user, role)
275        qset = self.plmobjectuserlink_plmobject.filter(user__in=users,
276                                                          role=role)
277        return qset.exists()
278
279    def check_editable(self):
280        """
281        Raises a :exc:`.PermissionError` if :attr:`object` is not editable.
282        """
283        if not self.object.is_editable:
284            raise PermissionError("The object is not editable")
285
286    def check_in_group(self, user, raise_=True):
287        if user.username == settings.COMPANY:
288            return True
289        if not self.group.user_set.filter(id=user.id).exists():
290            if raise_:
291                raise PermissionError("The user %s does not belong to the group." % user.username)
292            else:
293                return False
294        return True
295
296    def revise(self, new_revision):
297        u"""
298        Makes a new revision: duplicates :attr:`object`. The duplicated
299        object's revision is *new_revision*.
300
301        Returns a controller of the new object.
302        """
303        # TODO: changes the group
304        self.check_readable()
305        if not new_revision or new_revision == self.revision or \
306           rx_bad_ref.search(new_revision):
307            raise RevisionError("Bad value for new_revision")
308        if self.is_cancelled or self.is_deprecated:
309            raise RevisionError("Object is deprecated or cancelled.")
310        if models.RevisionLink.objects.filter(old=self.object.pk).exists():
311            raise RevisionError("A revision already exists for %s" % self.object)
312        data = {}
313        fields = self.get_modification_fields() + self.get_creation_fields()
314        for attr in fields:
315            if attr not in ("reference", "type", "revision"):
316                data[attr] = getattr(self.object, attr)
317        data["state"] = models.get_default_state(self.lifecycle)
318        new_controller = self.create(self.reference, self.type, new_revision,
319                                     self._user, data)
320        details = "old : %s, new : %s" % (self.object, new_controller.object)
321        self._save_histo(models.RevisionLink.ACTION_NAME, details)
322        models.RevisionLink.objects.create(old=self.object, new=new_controller.object)
323        return new_controller
324
325    def is_revisable(self, check_user=True):
326        """
327        Returns True if :attr:`object` is revisable: if :meth:`revise` can be
328        called safely.
329
330        If *check_user* is True (the default), it also checks if :attr:`_user` can
331        see the object.
332        """
333        # a cancelled or deprecated object cannot be revised.
334        if self.is_cancelled or self.is_deprecated:
335            return False
336       
337        # objects.get fails if a link does not exist
338        # we can revise if any links exist
339        try:
340            models.RevisionLink.objects.get(old=self.object.pk)
341            return False
342        except ObjectDoesNotExist:
343            if check_user:
344                return self.check_readable(False)
345            else:
346                return True
347   
348    def get_previous_revisions(self):
349        try:
350            link = models.RevisionLink.objects.get(new=self.object.pk)
351            controller = type(self)(link.old, self._user)
352            return controller.get_previous_revisions() + [link.old]
353        except ObjectDoesNotExist:
354            return []
355
356    def get_next_revisions(self):
357        try:
358            link = models.RevisionLink.objects.get(old=self.object.pk)
359            controller = type(self)(link.new, self._user)
360            return [link.new] + controller.get_next_revisions()
361        except ObjectDoesNotExist:
362            return []
363
364    def get_all_revisions(self):
365        """
366        Returns a list of all revisions, ordered from less recent to most recent
367       
368        :rtype: list of :class:`.PLMObject`
369        """
370        return self.get_previous_revisions() + [self.object] +\
371               self.get_next_revisions()
372
373    def set_owner(self, new_owner, dirty=False):
374        """
375        Sets *new_owner* as current owner.
376
377        .. note::
378            This method does **NOT** check that the current user
379            is the owner of the object. :meth:`set_role` does that check.
380       
381        :param new_owner: the new owner
382        :type new_owner: :class:`~django.contrib.auth.models.User`
383        :param dirty: True if set_owner should skip sanity checks and
384                      should not send a mail (usefull for tests, default is
385                      False)
386        :raise: :exc:`.PermissionError` if *new_owner* is not a contributor
387        :raise: :exc:`ValueError` if *new_owner* is the company and the
388                object is editable
389        """
390       
391        if not dirty:
392            self.check_contributor(new_owner)
393            self.check_in_group(new_owner)
394            if new_owner.username == settings.COMPANY:
395                if self.is_editable:
396                    raise ValueError("The company cannot own an editable object.")
397
398        links = models.PLMObjectUserLink.objects.filter(plmobject=self.object,
399                role="owner")
400        links.delete()
401        models.PLMObjectUserLink.objects.create(user=new_owner,
402               plmobject=self.object, role="owner")
403        if dirty:
404            self.object.owner = new_owner
405            self.object.save()
406        else:
407            self.owner = new_owner
408            self.save()
409        # we do not need to write this event in an history since save() has
410        # already done it
411
412    def add_notified(self, new_notified):
413        """
414        Adds *new_notified* to the list of users notified when :attr:`object`
415        changes.
416       
417        :param new_notified: the new user who would be notified
418        :type new_notified: :class:`~django.contrib.auth.models.User`
419        :raise: :exc:`IntegrityError` if *new_notified* is already notified
420            when :attr:`object` changes
421        """
422        if new_notified != self._user:
423            self.check_permission("owner")
424        self.check_in_group(new_notified)
425        models.PLMObjectUserLink.objects.create(plmobject=self.object,
426            user=new_notified, role="notified")
427        details = "user: %s" % new_notified
428        self._save_histo("New notified", details)
429
430    def remove_notified(self, notified):
431        """
432        Removes *notified* to the list of users notified when :attr:`object`
433        changes.
434       
435        :param notified: the user who would be no more notified
436        :type notified: :class:`~django.contrib.auth.models.User`
437        :raise: :exc:`ObjectDoesNotExist` if *notified* is not notified
438            when :attr:`object` changes
439        """
440       
441        if notified != self._user:
442            self.check_permission("owner")
443        link = models.PLMObjectUserLink.objects.get(plmobject=self.object,
444                user=notified, role="notified")
445        link.delete()
446        details = u"user: %s" % notified
447        self._save_histo("Notified removed", details)
448
449    def set_signer(self, signer, role):
450        """
451        Sets *signer* as current signer for *role*. *role* must be a valid
452        sign role (see :func:`.level_to_sign_str` to get a role from a
453        sign level (int))
454       
455        :param signer: the new signer
456        :type signer: :class:`~django.contrib.auth.models.User`
457        :param str role: the sign role
458        :raise: :exc:`.PermissionError` if *signer* is not a contributor
459        :raise: :exc:`.PermissionError` if *role* is invalid (level to high)
460        """
461
462        self.check_contributor(signer)
463        self.check_in_group(signer)
464       
465        # remove old signer
466        old_signer = None
467        try:
468            link = models.PLMObjectUserLink.objects.get(plmobject=self.object,
469               role=role)
470            old_signer = link.user
471            link.delete()
472        except ObjectDoesNotExist:
473            pass
474        # check if the role is valid
475        max_level = self.lifecycle.nb_states - 1
476        level = int(re.search(r"\d+", role).group(0))
477        if level > max_level:
478            # TODO better exception ?
479            raise PermissionError("bad role")
480        # add new signer
481        models.PLMObjectUserLink.objects.create(plmobject=self.object,
482                                                user=signer, role=role)
483        details = u"signer: %s, level : %d" % (signer, level)
484        if old_signer:
485            details += u", old signer: %s" % old_signer
486        self._save_histo("New signer", details)
487
488    def set_role(self, user, role):
489        """
490        Sets role *role* (like `owner` or `notified`) for *user*
491
492        .. note::
493            If *role* is `owner` or a sign role, the old user who had
494            this role will lose it. Only the owner can changes these
495            roles.
496
497            If *role* is notified, others roles are preserved.
498
499        :raise: :exc:`ValueError` if *role* is invalid
500        :raise: :exc:`.PermissionError` if *user* is not allowed to has role
501            *role*
502        """
503        if role == "owner":
504            self.check_permission("owner")
505            self.set_owner(user)
506        elif role == "notified":
507            self.add_notified(user)
508        elif role.startswith("sign"):
509            self.check_permission("owner")
510            self.set_signer(user, role)
511        else:
512            raise ValueError("bad value for role")
513
514    def check_permission(self, role, raise_=True):
515        if self._user.username == settings.COMPANY:
516            # the company is like a super user
517            return True
518        if not self.group.user_set.filter(id=self._user.id).exists():
519            if raise_:
520                raise PermissionError("action not allowed for %s" % self._user)
521            else:
522                return False
523        return super(PLMObjectController, self).check_permission(role, raise_)
524
525    def check_readable(self, raise_=True):
526        """
527        Returns ``True`` if the user can read (is allowed to) this object.
528
529        Raises a :exc:`.PermissionError` if the user cannot read the object
530        and *raise_* is ``True`` (the default).
531        """
532        if not self.is_editable:
533            return True
534        if self.is_cancelled:
535            return True
536        if self._user.username == settings.COMPANY:
537            # the company is like a super user
538            return True
539        if self.owner_id == self._user.id:
540            return True
541        if self.group.user_set.filter(id=self._user.id).exists():
542            return True
543        if raise_:
544            raise PermissionError("You can not see this object.")
545        return False
546
547    def cancel(self):
548        """
549        Cancels the object:
550
551            * Its lifecycle becomes "cancelled".
552            * Its owner becomes the company.
553            * It removes all signer.
554        """
555        company = models.User.objects.get(username=settings.COMPANY)
556        self.lifecycle = models.get_cancelled_lifecycle()
557        self.state = models.get_cancelled_state()
558        self.set_owner(company, True)
559        self.plmobjectuserlink_plmobject.filter(role__startswith=models.ROLE_SIGN).delete()
560        self.save(with_history=False)
561        self._save_histo("Cancel", "Object cancelled")
562        self._update_state_history()
563
Note: See TracBrowser for help on using the repository browser.