Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
90.00% covered (success)
90.00%
117 / 130
77.78% covered (warning)
77.78%
7 / 9
CRAP
0.00% covered (danger)
0.00%
0 / 1
AbstractWorker
90.00% covered (success)
90.00%
117 / 130
77.78% covered (warning)
77.78%
7 / 9
35.16
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 processTask
88.79% covered (warning)
88.79%
95 / 107
0.00% covered (danger)
0.00%
0 / 1
18.46
 formatTaskLabel
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
3.14
 getLogContext
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 startUserSession
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
4
 isRemoteTaskSynchroniser
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
3
 getRemoteStatus
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
2
 isTaskCancelled
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getUserFactoryService
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2017-2023 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22namespace oat\tao\model\taskQueue\Worker;
23
24use common_report_Report as Report;
25use oat\oatbox\log\LoggerAwareTrait;
26use oat\oatbox\session\StatelessSession;
27use oat\tao\model\taskQueue\QueuerInterface;
28use oat\tao\model\taskQueue\Task\CallbackTaskInterface;
29use oat\tao\model\taskQueue\Task\RemoteTaskSynchroniserInterface;
30use oat\tao\model\taskQueue\Task\TaskInterface;
31use oat\tao\model\taskQueue\Task\TaskLanguageLoader;
32use oat\tao\model\taskQueue\TaskLog\CategorizedStatus;
33use oat\tao\model\taskQueue\TaskLog\Entity\EntityInterface;
34use oat\tao\model\taskQueue\TaskLogInterface;
35use oat\oatbox\service\ServiceManagerAwareInterface;
36use oat\oatbox\service\ServiceManagerAwareTrait;
37use oat\generis\model\user\UserFactoryServiceInterface;
38use oat\generis\model\OntologyAwareTrait;
39use oat\oatbox\session\SessionService;
40
41abstract class AbstractWorker implements WorkerInterface, ServiceManagerAwareInterface
42{
43    use LoggerAwareTrait;
44    use ServiceManagerAwareTrait;
45    use OntologyAwareTrait;
46
47    /**
48     * @var QueuerInterface
49     */
50    protected $queuer;
51
52    /**
53     * @var TaskLogInterface
54     */
55    protected $taskLog;
56
57    public function __construct(QueuerInterface $queuer, TaskLogInterface $taskLog)
58    {
59        $this->taskLog = $taskLog;
60        $this->queuer = $queuer;
61    }
62
63    /**
64     * Because of BC, it is kept as public, later it can be set to protected.
65     *
66     * @param TaskInterface $task
67     * @return string
68     * @throws \common_exception_NotFound
69     */
70    public function processTask(TaskInterface $task)
71    {
72        if (!$this->isTaskCancelled($task)) {
73            $report = Report::createInfo(__('Running task %s', $task->getId()));
74            try {
75                $this->startUserSession($task);
76
77                $this->logInfo(
78                    sprintf(
79                        'Processing task %s [%s]',
80                        $this->formatTaskLabel($task),
81                        $task->getId()
82                    ),
83                    $this->getLogContext()
84                );
85
86                //Database operation in task log
87                $rowsTouched = $this->taskLog->setStatus(
88                    $task->getId(),
89                    TaskLogInterface::STATUS_RUNNING,
90                    TaskLogInterface::STATUS_DEQUEUED
91                );
92
93                // if the task is being executed by another worker, just return, no report needs to be saved
94                if (!$rowsTouched) {
95                    $this->logInfo(
96                        sprintf(
97                            'Task %s [%s] seems to be processed by another worker.',
98                            $this->formatTaskLabel($task),
99                            $task->getId()
100                        ),
101                        $this->getLogContext()
102                    );
103                    return TaskLogInterface::STATUS_UNKNOWN;
104                }
105
106                // let the task know that it is called from a worker
107                $task->applyWorkerContext();
108
109                // Load translations with platform language
110                $this->getServiceLocator()->get(TaskLanguageLoader::class)->loadTranslations($task);
111
112                // execute the task
113                $taskReport = $task();
114
115                $this->logInfo(
116                    sprintf(
117                        'Task %s [%s] has been processed.',
118                        $this->formatTaskLabel($task),
119                        $task->getId()
120                    ),
121                    $this->getLogContext()
122                );
123
124                if (!$taskReport instanceof Report) {
125                    $this->logWarning(
126                        sprintf(
127                            'Task %s [%s] should return a report object.',
128                            $this->formatTaskLabel($task),
129                            $task->getId()
130                        ),
131                        $this->getLogContext()
132                    );
133                    //todo: isn't this message confusinig?
134                    $taskReport = Report::createInfo(__('Task not returned any report.'));
135                }
136
137                $report->add($taskReport);
138
139                unset($taskReport, $rowsTouched);
140            } catch (\Error $e) {
141                $this->logCritical(
142                    sprintf(
143                        'Executing task %s [%s] failed with MSG: %s. [%s] Trace: %s',
144                        $this->formatTaskLabel($task),
145                        $task->getId(),
146                        $e->getMessage(),
147                        get_class($e),
148                        $e->getTraceAsString()
149                    ),
150                    $this->getLogContext()
151                );
152
153                $report = Report::createFailure(__('Executing task %s failed', $task->getId()));
154            } catch (\Exception $e) {
155                $this->logError(
156                    sprintf(
157                        'Executing task %s [%s] failed with MSG: %s. [%s] Trace: %s',
158                        $this->formatTaskLabel($task),
159                        $task->getId(),
160                        $e->getMessage(),
161                        get_class($e),
162                        $e->getTraceAsString()
163                    ),
164                    $this->getLogContext()
165                );
166                $report = Report::createFailure(__('Executing task %s failed', $task->getId()));
167            }
168
169            // Initializing status
170            $status = $report->getType() == Report::TYPE_ERROR || $report->containsError()
171                ? TaskLogInterface::STATUS_FAILED
172                : TaskLogInterface::STATUS_COMPLETED;
173
174            // Change the status if the task has children
175            if ($task->hasChildren() && $status == TaskLogInterface::STATUS_COMPLETED) {
176                $status = TaskLogInterface::STATUS_CHILD_RUNNING;
177            }
178
179            $cloneCreated = false;
180
181            // Check if the task is a special sync task: The status of the parent task depends on the status of the
182            // remote task.
183            if ($this->isRemoteTaskSynchroniser($task) && $status == TaskLogInterface::STATUS_COMPLETED) {
184                // if the remote task is still in progress, we have to reschedule this task
185                // the RESTApi returns TaskLogCategorizedStatus values
186                if (
187                    in_array(
188                        $this->getRemoteStatus($task),
189                        [CategorizedStatus::STATUS_CREATED, CategorizedStatus::STATUS_IN_PROGRESS]
190                    )
191                ) {
192                    if ($this->queuer->count() <= 1) {
193                        // If there is less than or exactly one task in the queue, let's sleep a bit
194                        // in order not to regenerate the same task too much
195                        sleep(3);
196                    }
197
198                    $cloneCreated = $this->queuer->enqueue(clone $task, $task->getLabel());
199                } elseif ($this->getRemoteStatus($task) == CategorizedStatus::STATUS_FAILED) {
200                    // if the remote task status is failed
201                    $status = TaskLogInterface::STATUS_FAILED;
202                }
203            }
204
205            if (!$cloneCreated) {
206                $this->taskLog->setReport($task->getId(), $report, $status);
207            } else {
208                // if there is a clone, delete the old task log
209                //TODO: once we have the centralized way of cleaning up the log table, this should be refactored
210                $this->taskLog->getBroker()->deleteById($task->getId());
211            }
212
213            // Update parent
214            if ($task->hasParent()) {
215                /** @var EntityInterface $parentLogTask */
216                $parentLogTask = $this->taskLog->getById($task->getParentId());
217                if (!$parentLogTask->isMasterStatus()) {
218                    $this->taskLog->updateParent($task->getParentId());
219                }
220            }
221
222            unset($report);
223        } else {
224            $this->taskLog->setReport(
225                $task->getId(),
226                Report::createInfo(__('Task %s has been cancelled, message was not processed.', $task->getId())),
227                TaskLogInterface::STATUS_CANCELLED
228            );
229
230            $status = TaskLogInterface::STATUS_CANCELLED;
231        }
232
233        // delete message from queue
234        $this->queuer->acknowledge($task);
235
236        return $status;
237    }
238
239    protected function formatTaskLabel(TaskInterface $task): string
240    {
241        $label = $task->getLabel();
242
243        if (!is_string($label)) {
244            return '';
245        }
246
247        return strlen($label) > 255 ? '...' . substr($label, -252) : $label;
248    }
249
250
251    protected function getLogContext()
252    {
253        return [];
254    }
255
256    /**
257     * @param TaskInterface $task
258     * @throws \oat\oatbox\service\exception\InvalidServiceManagerException
259     * @throws \Exception
260     */
261    private function startUserSession(TaskInterface $task)
262    {
263        /** @var SessionService $sessionService */
264        $sessionService = $this->getServiceLocator()->get(SessionService::class);
265        if (
266            $task->getOwner()
267            && $sessionService->getCurrentSession()->getUser()->getIdentifier() !== $task->getOwner()
268        ) {
269            $user = $this->getUserFactoryService()->createUser($this->getResource($task->getOwner()));
270            $session = new StatelessSession($user);
271            $sessionService->setSession($session);
272        // Create Anonymous session if no owner
273        } elseif (!$task->getOwner()) {
274            \common_session_SessionManager::endSession();
275        }
276    }
277
278    /**
279     * @param TaskInterface $task
280     * @return bool
281     */
282    private function isRemoteTaskSynchroniser(TaskInterface $task)
283    {
284        return $task instanceof RemoteTaskSynchroniserInterface
285            || ($task instanceof CallbackTaskInterface
286                && $task->getCallable() instanceof RemoteTaskSynchroniserInterface);
287    }
288
289    /**
290     * @param TaskInterface|RemoteTaskSynchroniserInterface $task
291     * @return mixed
292     */
293    private function getRemoteStatus(TaskInterface $task)
294    {
295        return $task instanceof CallbackTaskInterface
296            ? $task->getCallable()->getRemoteStatus()
297            : $task->getRemoteStatus();
298    }
299
300    /**
301     * @param TaskInterface $task
302     * @return bool
303     */
304    private function isTaskCancelled(TaskInterface $task)
305    {
306        return $this->taskLog->getStatus($task->getId()) === TaskLogInterface::STATUS_CANCELLED;
307    }
308
309    /**
310     * @return UserFactoryServiceInterface
311     */
312    private function getUserFactoryService()
313    {
314        return $this->getServiceLocator()->get(UserFactoryServiceInterface::SERVICE_ID);
315    }
316}