Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

#!/usr/bin/env python 

 

""" 

@file ion/core/process/worker.py 

@author Michael Meisinger 

@brief base class for a worker process 

""" 

 

from twisted.internet import defer 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

 

from ion.core.process.process import Process, ProcessFactory 

from ion.core.messaging.receiver import Receiver, WorkerReceiver, FanoutReceiver 

from ion.core.process.service_process import ServiceProcess 

import ion.util.procutils as pu 

 

class WorkerProcess(Process): 

    """ 

    Worker process 

    """ 

    @defer.inlineCallbacks 

    def plc_init(self): 

        msg_name = str(self.spawn_args['receiver-name']) 

        rec_type = str(self.spawn_args['receiver-type']) 

        scope = str(self.spawn_args['scope']) 

 

        if rec_type == 'worker': 

            self.workReceiver = WorkerReceiver( 

                label=__name__, 

                name=msg_name, 

                scope=scope, 

                handler=self.receive) 

        elif rec_type == 'fanout': 

            self.workReceiver = FanoutReceiver( 

                label=__name__, 

                name=msg_name, 

                scope=scope, 

                handler=self.receive) 

        else: 

            raise RuntimeError("Unknown receiver-type: "+str(rec_type)) 

 

        yield self.workReceiver.attach() 

 

    @defer.inlineCallbacks 

    def op_work(self, content, headers, msg): 

        yield self._work(content) 

        yield self.reply_ok(msg, {'work-id':content['work-id']}) 

 

    @defer.inlineCallbacks 

    def _work(self,content): 

        myid = self.proc_name + ":" + self.id.local 

        workid = str(content['work-id']) 

        waittime = float(content['work']) 

        log.info("worker="+myid+" job="+workid+" work="+str(waittime)) 

        yield pu.asleep(waittime) 

        log.info("worker="+myid+" job="+workid+" done at="+str(pu.currenttime_ms())) 

 

# Spawn of the process using the module name 

factory = ProcessFactory(WorkerProcess)