Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 136
0.00% covered (danger)
0.00%
0 / 13
CRAP
0.00% covered (danger)
0.00%
0 / 1
Worker
0.00% covered (danger)
0.00%
0 / 136
0.00% covered (danger)
0.00%
0 / 13
1892
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
6
 run
0.00% covered (danger)
0.00%
0 / 32
0.00% covered (danger)
0.00%
0 / 1
56
 processTask
0.00% covered (danger)
0.00%
0 / 55
0.00% covered (danger)
0.00%
0 / 1
272
 isRemoteTaskSynchroniser
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 getRemoteStatus
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 setMaxIterations
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 setDedicatedQueue
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 isRunning
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 registerSigHandlers
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
6
 shutdown
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 pauseProcessing
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 unPauseProcessing
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 getWaitInterval
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2017 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22namespace oat\taoTaskQueue\model;
23
24use oat\oatbox\log\LoggerAwareTrait;
25use common_report_Report as Report;
26use oat\taoTaskQueue\model\Entity\TaskLogEntity;
27use oat\taoTaskQueue\model\Task\CallbackTaskInterface;
28use oat\taoTaskQueue\model\Task\RemoteTaskSynchroniserInterface;
29use oat\taoTaskQueue\model\Task\TaskInterface;
30use oat\taoTaskQueue\model\ValueObjects\TaskLogCategorizedStatus;
31
32/**
33 * Processes tasks from the queue.
34 *
35 * @deprecated Use \oat\taoTaskQueue\model\LongRunningWorker
36 *
37 * @author Gyula Szucs <gyula@taotesting.com>
38 */
39final class Worker implements WorkerInterface
40{
41    use LoggerAwareTrait;
42
43    public const WAIT_INTERVAL = 1; // sec
44    public const MAX_SLEEPING_TIME_FOR_DEDICATED_QUEUE = 30; //max sleeping time if working on only one queue
45
46    /**
47     * @var QueueDispatcherInterface
48     */
49    private $queueService;
50
51    /**
52     * @var QueueInterface
53     */
54    private $dedicatedQueue;
55
56    private $maxIterations = 0; //0 means infinite iteration
57    private $iterations;
58    private $shutdown;
59    private $paused;
60    private $iterationsWithOutTask = 0;
61    private $processId;
62    private $logContext;
63    private $taskLog;
64    /**
65     * @var bool
66     */
67    private $handleSignals;
68
69    /**
70     * @param QueueDispatcherInterface $queueService
71     * @param TaskLogInterface         $taskLog
72     * @param bool                     $handleSignals
73     */
74    public function __construct(
75        QueueDispatcherInterface $queueService,
76        TaskLogInterface $taskLog,
77        $handleSignals = true
78    ) {
79        $this->queueService = $queueService;
80        $this->taskLog = $taskLog;
81        $this->handleSignals = $handleSignals;
82        $this->processId = getmypid();
83
84        $this->logContext = [
85            'PID' => $this->processId
86        ];
87
88        if ($handleSignals) {
89            $this->registerSigHandlers();
90        }
91    }
92
93    /**
94     * @inheritdoc
95     */
96    public function run()
97    {
98        $this->logDebug('Starting worker.', $this->logContext);
99
100        while ($this->isRunning()) {
101            if ($this->paused) {
102                $this->logDebug('Paused... ', array_merge($this->logContext, [
103                    'Iteration' => $this->iterations
104                ]));
105                usleep(self::WAIT_INTERVAL * 1000000);
106                continue;
107            }
108
109            ++$this->iterations;
110
111            $this->logContext = array_merge($this->logContext, [
112                'Iteration' => $this->iterations
113            ]);
114
115            try {
116                $this->logDebug('Fetching tasks from queue ', $this->logContext);
117
118                // if there is a dedicated queue set, let's do dequeue on that one
119                // otherwise using the built-in strategy to get a new task from any registered queue
120                $task = $this->dedicatedQueue instanceof QueueInterface
121                    ? $this->dedicatedQueue->dequeue()
122                    : $this->queueService->dequeue();
123
124                // if no task to process, sleep for the specified time and continue.
125                if (!$task) {
126                    ++$this->iterationsWithOutTask;
127                    $waitInterval = $this->getWaitInterval();
128                    $this->logDebug('Sleeping for ' . $waitInterval . ' sec', $this->logContext);
129                    usleep($waitInterval * 1000000);
130
131                    continue;
132                }
133
134                // we have task, so set this back to 0
135                $this->iterationsWithOutTask = 0;
136
137                if (!$task instanceof TaskInterface) {
138                    $this->logWarning('The received queue item (' . $task . ') not processable.', $this->logContext);
139                    continue;
140                }
141
142                $this->processTask($task);
143
144                unset($task);
145            } catch (\Exception $e) {
146                $this->logError('Fetching data from queue failed with MSG: ' . $e->getMessage(), $this->logContext);
147                continue;
148            }
149        }
150
151        $this->logDebug('Worker finished.', $this->logContext);
152    }
153
154    /**
155     * @inheritdoc
156     */
157    public function processTask(TaskInterface $task)
158    {
159        $report = Report::createInfo(__('Running task %s', $task->getId()));
160
161        try {
162            $this->logDebug('Processing task ' . $task->getId(), $this->logContext);
163
164            $rowsTouched = $this->taskLog->setStatus(
165                $task->getId(),
166                TaskLogInterface::STATUS_RUNNING,
167                TaskLogInterface::STATUS_DEQUEUED
168            );
169
170            // if the task is being executed by another worker, just return, no report needs to be saved
171            if (!$rowsTouched) {
172                $this->logDebug(
173                    'Task ' . $task->getId() . ' seems to be processed by another worker.',
174                    $this->logContext
175                );
176
177                return TaskLogInterface::STATUS_UNKNOWN;
178            }
179
180            // let the task know that it is called from a worker
181            $task->applyWorkerContext();
182
183            // execute the task
184            $taskReport = $task();
185
186            if (!$taskReport instanceof Report) {
187                $this->logWarning('Task ' . $task->getId() . ' should return a report object.', $this->logContext);
188                $taskReport = Report::createInfo(__('Task not returned any report.'));
189            }
190
191            $report->add($taskReport);
192            unset($taskReport, $rowsTouched);
193        } catch (\Exception $e) {
194            $this->logError(
195                'Executing task ' . $task->getId() . ' failed with MSG: ' . $e->getMessage(),
196                $this->logContext
197            );
198            $report = Report::createFailure(__('Executing task %s failed', $task->getId()));
199        }
200
201        // Initial status
202        $status = $report->getType() == Report::TYPE_ERROR || $report->containsError()
203            ? TaskLogInterface::STATUS_FAILED
204            : TaskLogInterface::STATUS_COMPLETED;
205
206        // Change the status if the task has children
207        if ($task->hasChildren() && $status == TaskLogInterface::STATUS_COMPLETED) {
208            $status = TaskLogInterface::STATUS_CHILD_RUNNING;
209        }
210
211        $cloneCreated = false;
212
213        // if the task is a special sync task: the status of the parent task depends on the status of the remote task.
214        if ($this->isRemoteTaskSynchroniser($task) && $status == TaskLogInterface::STATUS_COMPLETED) {
215            // if the remote task is still in progress, we have to reschedule this task
216            // the RESTApi returns TaskLogCategorizedStatus values
217            if (
218                in_array(
219                    $this->getRemoteStatus($task),
220                    [
221                        TaskLogCategorizedStatus::STATUS_CREATED,
222                        TaskLogCategorizedStatus::STATUS_IN_PROGRESS,
223                    ]
224                )
225            ) {
226                if ($this->queueService->count() <= 1) {
227                    // If there is less than or exactly one task in the queue, let's sleep a bit, in order not to
228                    // regenerate the same task too much
229                    sleep(3);
230                }
231
232                $cloneCreated = $this->queueService->enqueue(clone $task, $task->getLabel());
233            } elseif ($this->getRemoteStatus($task) == TaskLogCategorizedStatus::STATUS_FAILED) {
234                // if the remote task status is failed
235                $status = TaskLogInterface::STATUS_FAILED;
236            }
237        }
238
239        if (!$cloneCreated) {
240            $this->taskLog->setReport($task->getId(), $report, $status);
241        } else {
242            // if there is a clone, delete the old task log
243            //TODO: once we have the centralized way of cleaning up the log table, this should be refactored
244            $this->taskLog->getBroker()->deleteById($task->getId());
245        }
246
247        // Update parent
248        if ($task->hasParent()) {
249            /** @var TaskLogEntity $parentLogTask */
250            $parentLogTask = $this->taskLog->getById($task->getParentId());
251            if (!$parentLogTask->isMasterStatus()) {
252                $this->taskLog->updateParent($task->getParentId());
253            }
254        }
255
256        unset($report);
257
258        // delete message from queue
259        $this->queueService->acknowledge($task);
260
261        return $status;
262    }
263
264    /**
265     * @param TaskInterface $task
266     * @return bool
267     */
268    private function isRemoteTaskSynchroniser(TaskInterface $task)
269    {
270        return $task instanceof RemoteTaskSynchroniserInterface
271            || (
272                $task instanceof CallbackTaskInterface
273                && $task->getCallable() instanceof RemoteTaskSynchroniserInterface
274            );
275    }
276
277    /**
278     * @param TaskInterface $task
279     * @return mixed
280     */
281    private function getRemoteStatus(TaskInterface $task)
282    {
283        return $task instanceof CallbackTaskInterface
284            ? $task->getCallable()->getRemoteStatus()
285            : $task->getRemoteStatus();
286    }
287
288    /**
289     * Only set-able if there is a dedicated queue set.
290     * @deprecated
291     *
292     * @inheritdoc
293     */
294    public function setMaxIterations($maxIterations)
295    {
296        $this->maxIterations = $maxIterations;
297
298        return $this;
299    }
300
301    /**
302     * @inheritdoc
303     */
304    public function setDedicatedQueue(QueueInterface $queue, $maxIterations = 0)
305    {
306        $this->dedicatedQueue = $queue;
307        $this->maxIterations  = (int) $maxIterations * $this->dedicatedQueue->getNumberOfTasksToReceive();
308
309        $this->logContext['QueueName'] = $queue->getName();
310
311        return $this;
312    }
313
314    /**
315     * @return bool
316     */
317    private function isRunning()
318    {
319        if ($this->handleSignals) {
320            pcntl_signal_dispatch();
321        }
322
323        if ($this->shutdown) {
324            return false;
325        }
326
327        if ($this->maxIterations > 0) {
328            return $this->iterations < $this->maxIterations;
329        }
330
331        return true;
332    }
333
334    /**
335     * Register signal handlers that a worker should respond to.
336     *
337     * TERM/INT/QUIT: Shutdown after the current job is finished then exit.
338     * USR2: Pause worker, no new jobs will be processed but the current one will be finished.
339     * CONT: Resume worker.
340     */
341    private function registerSigHandlers()
342    {
343        if (!function_exists('pcntl_signal')) {
344            $this->logError('Please make sure that "pcntl" is enabled.', $this->logContext);
345            throw new \RuntimeException('Please make sure that "pcntl" is enabled.');
346        }
347
348        declare(ticks=1);
349
350        pcntl_signal(SIGTERM, [$this, 'shutdown']);
351        pcntl_signal(SIGINT, [$this, 'shutdown']);
352        pcntl_signal(SIGQUIT, [$this, 'shutdown']);
353        pcntl_signal(SIGUSR2, [$this, 'pauseProcessing']);
354        pcntl_signal(SIGCONT, [$this, 'unPauseProcessing']);
355
356        $this->logDebug('Finished setting up signal handlers', $this->logContext);
357    }
358
359    public function shutdown()
360    {
361        $this->logDebug('TERM/INT/QUIT received; shutting down gracefully...', $this->logContext);
362        $this->shutdown = true;
363    }
364
365    public function pauseProcessing()
366    {
367        $this->logDebug('USR2 received; pausing task processing...', $this->logContext);
368        $this->paused = true;
369    }
370
371    public function unPauseProcessing()
372    {
373        $this->logDebug('CONT received; resuming task processing...', $this->logContext);
374        $this->paused = false;
375    }
376
377    /**
378     * Calculate the sleeping time dynamically in case of no task to work on.
379     *
380     * @return int (sec)
381     */
382    private function getWaitInterval()
383    {
384        if ($this->dedicatedQueue instanceof QueueInterface) {
385            $waitTime = $this->iterationsWithOutTask * self::WAIT_INTERVAL;
386
387            return min($waitTime, self::MAX_SLEEPING_TIME_FOR_DEDICATED_QUEUE);
388        } else {
389            return (int) $this->queueService->getWaitTime();
390        }
391    }
392}