Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
53.16% |
42 / 79 |
|
18.18% |
2 / 11 |
CRAP | |
0.00% |
0 / 1 |
LongRunningWorker | |
53.16% |
42 / 79 |
|
18.18% |
2 / 11 |
115.40 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
getLogContext | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
2 | |||
run | |
40.62% |
13 / 32 |
|
0.00% |
0 / 1 |
13.54 | |||
setMaxIterations | |
75.00% |
3 / 4 |
|
0.00% |
0 / 1 |
2.06 | |||
isRunning | |
71.43% |
5 / 7 |
|
0.00% |
0 / 1 |
5.58 | |||
registerSigHandlers | |
81.82% |
9 / 11 |
|
0.00% |
0 / 1 |
4.10 | |||
shutdown | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
pauseProcessing | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
unPauseProcessing | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
getWaitInterval | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
12 | |||
hasEnoughSpace | |
75.00% |
3 / 4 |
|
0.00% |
0 / 1 |
3.14 |
1 | <?php |
2 | |
3 | /** |
4 | * This program is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU General Public License |
6 | * as published by the Free Software Foundation; under version 2 |
7 | * of the License (non-upgradable). |
8 | * |
9 | * This program is distributed in the hope that it will be useful, |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | * GNU General Public License for more details. |
13 | * |
14 | * You should have received a copy of the GNU General Public License |
15 | * along with this program; if not, write to the Free Software |
16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
17 | * |
18 | * Copyright (c) 2017 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT); |
19 | * |
20 | */ |
21 | |
22 | namespace oat\taoTaskQueue\model; |
23 | |
24 | use oat\tao\model\taskQueue\QueueDispatcherInterface; |
25 | use oat\tao\model\taskQueue\QueueInterface; |
26 | use oat\tao\model\taskQueue\QueuerInterface; |
27 | use oat\tao\model\taskQueue\Task\TaskInterface; |
28 | use oat\tao\model\taskQueue\TaskLogInterface; |
29 | use oat\tao\model\taskQueue\Worker\AbstractWorker; |
30 | |
31 | /** |
32 | * Processes tasks from the queue service running for limited/unlimited time |
33 | * |
34 | * @author Gyula Szucs <gyula@taotesting.com> |
35 | */ |
36 | final class LongRunningWorker extends AbstractWorker |
37 | { |
38 | public const WAIT_INTERVAL = 1; // sec |
39 | public const MAX_SLEEPING_TIME_FOR_DEDICATED_QUEUE = 30; //max sleeping time if working on only one queue |
40 | |
41 | private $maxIterations = 0; //0 means infinite iteration |
42 | private $iterations = 0; |
43 | private $shutdown; |
44 | private $paused; |
45 | private $iterationsWithOutTask = 0; |
46 | private $handleSignals; |
47 | private $sigHandlersRegistered = false; |
48 | |
49 | public function __construct(QueuerInterface $queuer, TaskLogInterface $taskLog, $handleSignals = true) |
50 | { |
51 | parent::__construct($queuer, $taskLog); |
52 | $this->handleSignals = $handleSignals; |
53 | } |
54 | |
55 | protected function getLogContext() |
56 | { |
57 | $rs = [ |
58 | 'PID' => getmypid(), |
59 | 'Iteration' => $this->iterations |
60 | ]; |
61 | |
62 | if ($this->queuer instanceof QueueInterface) { |
63 | $rs['QueueName'] = $this->queuer->getName(); |
64 | } |
65 | |
66 | return $rs; |
67 | } |
68 | |
69 | /** |
70 | * @inheritdoc |
71 | */ |
72 | public function run() |
73 | { |
74 | $this->registerSigHandlers(); |
75 | $this->logDebug('Starting LongRunningWorker.', $this->getLogContext()); |
76 | |
77 | while ($this->isRunning()) { |
78 | if ($this->paused) { |
79 | $this->logInfo('Worker paused... ', $this->getLogContext()); |
80 | usleep(self::WAIT_INTERVAL * 1000000); |
81 | continue; |
82 | } |
83 | |
84 | ++$this->iterations; |
85 | |
86 | try { |
87 | $this->logDebug('Fetching tasks from queue ', $this->getLogContext()); |
88 | |
89 | $task = $this->queuer->dequeue(); |
90 | |
91 | // if no task to process, sleep for the specified time and continue. |
92 | if (!$task) { |
93 | ++$this->iterationsWithOutTask; |
94 | $waitInterval = $this->getWaitInterval(); |
95 | $this->logDebug('No tasks found. Sleeping for ' . $waitInterval . ' sec', $this->getLogContext()); |
96 | usleep($waitInterval * 1000000); |
97 | |
98 | continue; |
99 | } |
100 | |
101 | // we have task, so set this back to 0 |
102 | $this->iterationsWithOutTask = 0; |
103 | |
104 | if (!$task instanceof TaskInterface) { |
105 | $this->logWarning( |
106 | 'The received queue item (' . $task . ') not processable.', |
107 | $this->getLogContext() |
108 | ); |
109 | |
110 | continue; |
111 | } |
112 | |
113 | $this->processTask($task); |
114 | |
115 | unset($task); |
116 | } catch (\Exception $e) { |
117 | $this->logError( |
118 | 'Fetching data from queue failed with MSG: ' . $e->getMessage(), |
119 | $this->getLogContext() |
120 | ); |
121 | |
122 | continue; |
123 | } |
124 | } |
125 | |
126 | $this->logDebug('LongRunningWorker finished.', $this->getLogContext()); |
127 | } |
128 | |
129 | /** |
130 | * @inheritdoc |
131 | */ |
132 | public function setMaxIterations($maxIterations) |
133 | { |
134 | if (!$this->queuer instanceof QueueInterface) { |
135 | throw new \LogicException('Limit can be set only if a dedicated queue is set.'); |
136 | } |
137 | |
138 | $this->maxIterations = $maxIterations * $this->queuer->getNumberOfTasksToReceive(); |
139 | |
140 | return $this; |
141 | } |
142 | |
143 | /** |
144 | * @return bool |
145 | */ |
146 | private function isRunning() |
147 | { |
148 | if ($this->handleSignals) { |
149 | pcntl_signal_dispatch(); |
150 | } |
151 | |
152 | if ($this->shutdown) { |
153 | return false; |
154 | } |
155 | |
156 | if ($this->maxIterations > 0) { |
157 | return $this->iterations < $this->maxIterations && $this->hasEnoughSpace(); |
158 | } |
159 | |
160 | return true; |
161 | } |
162 | |
163 | /** |
164 | * Register signal handlers that a worker should respond to. |
165 | * |
166 | * TERM/INT/QUIT: Shutdown after the current job is finished then exit. |
167 | * USR2: Pause worker, no new jobs will be processed but the current one will be finished. |
168 | * CONT: Resume worker. |
169 | */ |
170 | private function registerSigHandlers() |
171 | { |
172 | if ($this->handleSignals && !$this->sigHandlersRegistered) { |
173 | if (!function_exists('pcntl_signal')) { |
174 | $this->logError('Please make sure that "pcntl" is enabled.', $this->getLogContext()); |
175 | throw new \RuntimeException('Please make sure that "pcntl" is enabled.'); |
176 | } |
177 | |
178 | declare(ticks=1); |
179 | |
180 | pcntl_signal(SIGTERM, [$this, 'shutdown']); |
181 | pcntl_signal(SIGINT, [$this, 'shutdown']); |
182 | pcntl_signal(SIGQUIT, [$this, 'shutdown']); |
183 | pcntl_signal(SIGUSR2, [$this, 'pauseProcessing']); |
184 | pcntl_signal(SIGCONT, [$this, 'unPauseProcessing']); |
185 | |
186 | $this->sigHandlersRegistered = true; |
187 | |
188 | $this->logDebug('Finished setting up signal handlers', $this->getLogContext()); |
189 | } |
190 | } |
191 | |
192 | public function shutdown() |
193 | { |
194 | $this->logInfo('TERM/INT/QUIT received; shutting down gracefully...', $this->getLogContext()); |
195 | $this->shutdown = true; |
196 | } |
197 | |
198 | public function pauseProcessing() |
199 | { |
200 | $this->logInfo('USR2 received; pausing task processing...', $this->getLogContext()); |
201 | $this->paused = true; |
202 | } |
203 | |
204 | public function unPauseProcessing() |
205 | { |
206 | $this->logInfo('CONT received; resuming task processing...', $this->getLogContext()); |
207 | $this->paused = false; |
208 | } |
209 | |
210 | /** |
211 | * Calculate the sleeping time dynamically in case of no task to work on. |
212 | * |
213 | * @return int (sec) |
214 | */ |
215 | private function getWaitInterval() |
216 | { |
217 | if ($this->queuer instanceof QueueInterface) { |
218 | $waitTime = $this->iterationsWithOutTask * self::WAIT_INTERVAL; |
219 | |
220 | return min($waitTime, self::MAX_SLEEPING_TIME_FOR_DEDICATED_QUEUE); |
221 | } elseif ($this->queuer instanceof QueueDispatcherInterface) { |
222 | return (int) $this->queuer->getWaitTime(); |
223 | } else { |
224 | return self::WAIT_INTERVAL; |
225 | } |
226 | } |
227 | |
228 | private function hasEnoughSpace(): bool |
229 | { |
230 | if ($this->iterationsWithOutTask || $this->queuer->hasPreFetchedMessages()) { |
231 | return true; |
232 | } |
233 | |
234 | $freeSpace = $this->maxIterations - $this->iterations; |
235 | |
236 | return $freeSpace >= $this->queuer->getNumberOfTasksToReceive(); |
237 | } |
238 | } |