Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
53.16% covered (warning)
53.16%
42 / 79
18.18% covered (danger)
18.18%
2 / 11
CRAP
0.00% covered (danger)
0.00%
0 / 1
LongRunningWorker
53.16% covered (warning)
53.16%
42 / 79
18.18% covered (danger)
18.18%
2 / 11
115.40
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getLogContext
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
2
 run
40.62% covered (danger)
40.62%
13 / 32
0.00% covered (danger)
0.00%
0 / 1
13.54
 setMaxIterations
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
2.06
 isRunning
71.43% covered (warning)
71.43%
5 / 7
0.00% covered (danger)
0.00%
0 / 1
5.58
 registerSigHandlers
81.82% covered (warning)
81.82%
9 / 11
0.00% covered (danger)
0.00%
0 / 1
4.10
 shutdown
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 pauseProcessing
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 unPauseProcessing
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 getWaitInterval
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
12
 hasEnoughSpace
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
3.14
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2017 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22namespace oat\taoTaskQueue\model;
23
24use oat\tao\model\taskQueue\QueueDispatcherInterface;
25use oat\tao\model\taskQueue\QueueInterface;
26use oat\tao\model\taskQueue\QueuerInterface;
27use oat\tao\model\taskQueue\Task\TaskInterface;
28use oat\tao\model\taskQueue\TaskLogInterface;
29use oat\tao\model\taskQueue\Worker\AbstractWorker;
30
31/**
32 * Processes tasks from the queue service running for limited/unlimited time
33 *
34 * @author Gyula Szucs <gyula@taotesting.com>
35 */
36final class LongRunningWorker extends AbstractWorker
37{
38    public const WAIT_INTERVAL = 1; // sec
39    public const MAX_SLEEPING_TIME_FOR_DEDICATED_QUEUE = 30; //max sleeping time if working on only one queue
40
41    private $maxIterations = 0; //0 means infinite iteration
42    private $iterations = 0;
43    private $shutdown;
44    private $paused;
45    private $iterationsWithOutTask = 0;
46    private $handleSignals;
47    private $sigHandlersRegistered = false;
48
49    public function __construct(QueuerInterface $queuer, TaskLogInterface $taskLog, $handleSignals = true)
50    {
51        parent::__construct($queuer, $taskLog);
52        $this->handleSignals = $handleSignals;
53    }
54
55    protected function getLogContext()
56    {
57        $rs = [
58            'PID' => getmypid(),
59            'Iteration' => $this->iterations
60        ];
61
62        if ($this->queuer instanceof QueueInterface) {
63            $rs['QueueName'] = $this->queuer->getName();
64        }
65
66        return $rs;
67    }
68
69    /**
70     * @inheritdoc
71     */
72    public function run()
73    {
74        $this->registerSigHandlers();
75        $this->logDebug('Starting LongRunningWorker.', $this->getLogContext());
76
77        while ($this->isRunning()) {
78            if ($this->paused) {
79                $this->logInfo('Worker paused... ', $this->getLogContext());
80                usleep(self::WAIT_INTERVAL * 1000000);
81                continue;
82            }
83
84            ++$this->iterations;
85
86            try {
87                $this->logDebug('Fetching tasks from queue ', $this->getLogContext());
88
89                $task = $this->queuer->dequeue();
90
91                // if no task to process, sleep for the specified time and continue.
92                if (!$task) {
93                    ++$this->iterationsWithOutTask;
94                    $waitInterval = $this->getWaitInterval();
95                    $this->logDebug('No tasks found. Sleeping for ' . $waitInterval . ' sec', $this->getLogContext());
96                    usleep($waitInterval * 1000000);
97
98                    continue;
99                }
100
101                // we have task, so set this back to 0
102                $this->iterationsWithOutTask = 0;
103
104                if (!$task instanceof TaskInterface) {
105                    $this->logWarning(
106                        'The received queue item (' . $task . ') not processable.',
107                        $this->getLogContext()
108                    );
109
110                    continue;
111                }
112
113                $this->processTask($task);
114
115                unset($task);
116            } catch (\Exception $e) {
117                $this->logError(
118                    'Fetching data from queue failed with MSG: ' . $e->getMessage(),
119                    $this->getLogContext()
120                );
121
122                continue;
123            }
124        }
125
126        $this->logDebug('LongRunningWorker finished.', $this->getLogContext());
127    }
128
129    /**
130     * @inheritdoc
131     */
132    public function setMaxIterations($maxIterations)
133    {
134        if (!$this->queuer instanceof QueueInterface) {
135            throw new \LogicException('Limit can be set only if a dedicated queue is set.');
136        }
137
138        $this->maxIterations = $maxIterations * $this->queuer->getNumberOfTasksToReceive();
139
140        return $this;
141    }
142
143    /**
144     * @return bool
145     */
146    private function isRunning()
147    {
148        if ($this->handleSignals) {
149            pcntl_signal_dispatch();
150        }
151
152        if ($this->shutdown) {
153            return false;
154        }
155
156        if ($this->maxIterations > 0) {
157            return $this->iterations < $this->maxIterations && $this->hasEnoughSpace();
158        }
159
160        return true;
161    }
162
163    /**
164     * Register signal handlers that a worker should respond to.
165     *
166     * TERM/INT/QUIT: Shutdown after the current job is finished then exit.
167     * USR2: Pause worker, no new jobs will be processed but the current one will be finished.
168     * CONT: Resume worker.
169     */
170    private function registerSigHandlers()
171    {
172        if ($this->handleSignals && !$this->sigHandlersRegistered) {
173            if (!function_exists('pcntl_signal')) {
174                $this->logError('Please make sure that "pcntl" is enabled.', $this->getLogContext());
175                throw new \RuntimeException('Please make sure that "pcntl" is enabled.');
176            }
177
178            declare(ticks=1);
179
180            pcntl_signal(SIGTERM, [$this, 'shutdown']);
181            pcntl_signal(SIGINT, [$this, 'shutdown']);
182            pcntl_signal(SIGQUIT, [$this, 'shutdown']);
183            pcntl_signal(SIGUSR2, [$this, 'pauseProcessing']);
184            pcntl_signal(SIGCONT, [$this, 'unPauseProcessing']);
185
186            $this->sigHandlersRegistered = true;
187
188            $this->logDebug('Finished setting up signal handlers', $this->getLogContext());
189        }
190    }
191
192    public function shutdown()
193    {
194        $this->logInfo('TERM/INT/QUIT received; shutting down gracefully...', $this->getLogContext());
195        $this->shutdown = true;
196    }
197
198    public function pauseProcessing()
199    {
200        $this->logInfo('USR2 received; pausing task processing...', $this->getLogContext());
201        $this->paused = true;
202    }
203
204    public function unPauseProcessing()
205    {
206        $this->logInfo('CONT received; resuming task processing...', $this->getLogContext());
207        $this->paused = false;
208    }
209
210    /**
211     * Calculate the sleeping time dynamically in case of no task to work on.
212     *
213     * @return int (sec)
214     */
215    private function getWaitInterval()
216    {
217        if ($this->queuer instanceof QueueInterface) {
218            $waitTime = $this->iterationsWithOutTask * self::WAIT_INTERVAL;
219
220            return min($waitTime, self::MAX_SLEEPING_TIME_FOR_DEDICATED_QUEUE);
221        } elseif ($this->queuer instanceof QueueDispatcherInterface) {
222            return (int) $this->queuer->getWaitTime();
223        } else {
224            return self::WAIT_INTERVAL;
225        }
226    }
227
228    private function hasEnoughSpace(): bool
229    {
230        if ($this->iterationsWithOutTask || $this->queuer->hasPreFetchedMessages()) {
231            return true;
232        }
233
234        $freeSpace = $this->maxIterations - $this->iterations;
235
236        return $freeSpace >= $this->queuer->getNumberOfTasksToReceive();
237    }
238}