Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
56.61% covered (warning)
56.61%
107 / 189
31.58% covered (danger)
31.58%
6 / 19
CRAP
0.00% covered (danger)
0.00%
0 / 1
RdsTaskLogBroker
56.61% covered (warning)
56.61%
107 / 189
31.58% covered (danger)
31.58%
6 / 19
128.41
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getTaskExecutionTimesByDateRange
94.12% covered (success)
94.12%
32 / 34
0.00% covered (danger)
0.00%
0 / 1
4.00
 createContainer
100.00% covered (success)
100.00%
31 / 31
100.00% covered (success)
100.00%
1 / 1
3
 add
93.75% covered (success)
93.75%
15 / 16
0.00% covered (danger)
0.00%
0 / 1
4.00
 getTypes
100.00% covered (success)
100.00%
12 / 12
100.00% covered (success)
100.00%
1 / 1
1
 __toPhpCode
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 updateStatus
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
6
 addReport
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
2
 getStats
0.00% covered (danger)
0.00%
0 / 19
0.00% covered (danger)
0.00%
0 / 1
2
 archive
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 cancel
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 archiveCollection
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 cancelCollection
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 deleteById
72.73% covered (warning)
72.73%
8 / 11
0.00% covered (danger)
0.00%
0 / 1
2.08
 buildCounterStatusSql
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
20
 getTableName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 updateCollectionStatus
0.00% covered (danger)
0.00%
0 / 16
0.00% covered (danger)
0.00%
0 / 1
6
 getPersistence
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 getQueryBuilder
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2020 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22declare(strict_types=1);
23
24namespace oat\tao\model\taskQueue\TaskLog\Broker;
25
26use common_persistence_sql_SchemaManager;
27use common_persistence_SqlPersistence as SqlPersistence;
28use common_persistence_Persistence as Persistence;
29use common_Utils;
30use Doctrine\DBAL\Connection;
31use Doctrine\DBAL\ParameterType;
32use Doctrine\DBAL\Query\QueryBuilder;
33use common_report_Report as Report;
34use Exception;
35use DateTime;
36use oat\generis\persistence\PersistenceManager;
37use oat\tao\model\taskQueue\QueueDispatcherInterface;
38use oat\tao\model\taskQueue\Task\CallbackTaskInterface;
39use oat\tao\model\taskQueue\Task\TaskInterface;
40use oat\tao\model\taskQueue\TaskLog\CategorizedStatus;
41use oat\tao\model\taskQueue\TaskLog\CollectionInterface;
42use oat\tao\model\taskQueue\TaskLog\Entity\EntityInterface;
43use oat\tao\model\taskQueue\TaskLog\TaskLogFilter;
44use oat\tao\model\taskQueue\TaskLog\TasksLogsStats;
45use oat\tao\model\taskQueue\TaskLogInterface;
46use oat\oatbox\log\LoggerAwareTrait;
47
48/**
49 * Storing message logs in RDS.
50 *
51 * @author Gyula Szucs <gyula@taotesting.com>
52 */
53class RdsTaskLogBroker extends AbstractTaskLogBroker
54{
55    use LoggerAwareTrait;
56
57    /** @var string */
58    private $persistenceId;
59
60    /** @var SqlPersistence */
61    protected $persistence;
62
63    public function __construct(string $persistenceId, string $containerName = null)
64    {
65        $this->persistenceId = $persistenceId;
66        $this->containerName = $containerName ?? self::DEFAULT_CONTAINER_NAME;
67    }
68
69    public function getTaskExecutionTimesByDateRange(DateTime $from, DateTime $to): array
70    {
71        $collection = [];
72
73        try {
74            $qb = $this->getQueryBuilder();
75            $qb
76                ->select(
77                    TaskLogBrokerInterface::COLUMN_ID,
78                    TaskLogBrokerInterface::COLUMN_CREATED_AT,
79                    TaskLogBrokerInterface::COLUMN_UPDATED_AT
80                )
81                ->from($this->getTableName())
82                ->where(
83                    $qb->expr()->in(
84                        TaskLogBrokerInterface::COLUMN_STATUS,
85                        [
86                            $qb->expr()->literal(TaskLogInterface::STATUS_COMPLETED),
87                            $qb->expr()->literal(TaskLogInterface::STATUS_ARCHIVED),
88                        ]
89                    ),
90                    $qb->expr()->gte(TaskLogBrokerInterface::COLUMN_CREATED_AT, ':from'),
91                    $qb->expr()->lte(TaskLogBrokerInterface::COLUMN_CREATED_AT, ':to')
92                )
93                ->setParameters([
94                    'from' => $from->format('Y-m-d H:i:s'),
95                    'to' => $to->format('Y-m-d H:i:s')
96                ]);
97
98            $results = $qb->execute();
99
100            while (($row = $results->fetchAssociative()) !== false) {
101                if (empty($row[TaskLogBrokerInterface::COLUMN_UPDATED_AT])) {
102                    continue;
103                }
104                $collection[$row[TaskLogBrokerInterface::COLUMN_ID]] =
105                    strtotime($row[TaskLogBrokerInterface::COLUMN_UPDATED_AT])
106                    - strtotime($row[TaskLogBrokerInterface::COLUMN_CREATED_AT]);
107            }
108        } catch (Exception $exception) {
109            $this->logError('Searching for task logs failed with MSG: ' . $exception->getMessage());
110        }
111
112        return $collection;
113    }
114
115    /**
116     * @inheritdoc
117     */
118    public function createContainer(): void
119    {
120        /** @var common_persistence_sql_SchemaManager $schemaManager */
121        $schemaManager = $this->getPersistence()->getSchemaManager();
122
123        $fromSchema = $schemaManager->createSchema();
124        $toSchema = clone $fromSchema;
125
126        // if our table does not exist, let's create it
127        if (false === $fromSchema->hasTable($this->getTableName())) {
128            $table = $toSchema->createTable($this->getTableName());
129            $table->addOption('engine', 'InnoDB');
130            $table->addColumn(self::COLUMN_ID, 'string', ["notnull" => true, "length" => 255]);
131            $table->addColumn(
132                self::COLUMN_PARENT_ID,
133                'string',
134                ["notnull" => false, "length" => 255, "default" => null]
135            );
136            $table->addColumn(self::COLUMN_TASK_NAME, 'string', ["notnull" => true, "length" => 255]);
137            $table->addColumn(self::COLUMN_PARAMETERS, 'text', ["notnull" => false, "default" => null]);
138            $table->addColumn(self::COLUMN_LABEL, 'string', ["notnull" => false, "length" => 255]);
139            $table->addColumn(self::COLUMN_STATUS, 'string', ["notnull" => true, "length" => 50]);
140            $table->addColumn(self::COLUMN_MASTER_STATUS, 'boolean', ["default" => 0]);
141            $table->addColumn(self::COLUMN_OWNER, 'string', ["notnull" => false, "length" => 255, "default" => null]);
142            $table->addColumn(self::COLUMN_REPORT, 'text', ["notnull" => false, "default" => null]);
143            $table->addColumn(self::COLUMN_CREATED_AT, 'datetime', ['notnull' => true]);
144            $table->addColumn(self::COLUMN_UPDATED_AT, 'datetime', ['notnull' => false]);
145            $table->setPrimaryKey([self::COLUMN_ID]);
146            $table->addIndex(
147                [self::COLUMN_TASK_NAME, self::COLUMN_OWNER],
148                $this->getTableName() . 'IDX_task_name_owner'
149            );
150            $table->addIndex([self::COLUMN_STATUS], $this->getTableName() . 'IDX_status');
151            $table->addIndex([self::COLUMN_CREATED_AT], $this->getTableName() . 'IDX_created_at');
152
153            $queries = $this->getPersistence()->getPlatForm()->getMigrateSchemaSql($fromSchema, $toSchema);
154            foreach ($queries as $query) {
155                $this->getPersistence()->exec($query);
156            }
157        }
158    }
159
160    /**
161     * @inheritdoc
162     */
163    public function add(TaskInterface $task, string $status, string $label = null): void
164    {
165        $this->getPersistence()->insert($this->getTableName(), [
166            self::COLUMN_ID   => (string) $task->getId(),
167            self::COLUMN_PARENT_ID  => $task->getParentId() ? (string) $task->getParentId() : null,
168            self::COLUMN_TASK_NAME => $task instanceof CallbackTaskInterface && is_object($task->getCallable())
169                ? get_class($task->getCallable())
170                : get_class($task),
171            self::COLUMN_PARAMETERS => json_encode($task->getParameters()),
172            self::COLUMN_LABEL => (string) $label,
173            self::COLUMN_STATUS => $status,
174            self::COLUMN_OWNER => (string) $task->getOwner(),
175            self::COLUMN_CREATED_AT => $task->getCreatedAt()->format(
176                $this->getPersistence()->getPlatForm()->getDateTimeFormatString()
177            ),
178            self::COLUMN_UPDATED_AT => $this->getPersistence()->getPlatForm()->getNowExpression(),
179            self::COLUMN_MASTER_STATUS => $task->isMasterStatus(),
180        ], $this->getTypes());
181    }
182
183    protected function getTypes(): array
184    {
185        return [
186            ParameterType::STRING,
187            ParameterType::STRING,
188            ParameterType::STRING,
189            ParameterType::STRING,
190            ParameterType::STRING,
191            ParameterType::STRING,
192            ParameterType::STRING,
193            ParameterType::STRING,
194            ParameterType::STRING,
195            ParameterType::BOOLEAN,
196        ];
197    }
198
199    public function __toPhpCode()
200    {
201        return 'new ' . get_called_class() . '('
202            . common_Utils::toHumanReadablePhpString($this->persistenceId)
203            . ', '
204            . common_Utils::toHumanReadablePhpString($this->containerName)
205            . ')';
206    }
207
208    /**
209     * @inheritdoc
210     */
211    public function updateStatus(string $taskId, string $newStatus, string $prevStatus = null): int
212    {
213        $qb = $this->getQueryBuilder()
214            ->update($this->getTableName())
215            ->set(self::COLUMN_STATUS, ':status_new')
216            ->set(self::COLUMN_UPDATED_AT, ':updated_at')
217            ->where(self::COLUMN_ID . ' = :id')
218            ->setParameter('id', (string) $taskId)
219            ->setParameter('status_new', (string) $newStatus)
220            ->setParameter('updated_at', $this->getPersistence()->getPlatForm()->getNowExpression());
221
222        if ($prevStatus) {
223            $qb->andWhere(self::COLUMN_STATUS . ' = :status_prev')
224                ->setParameter('status_prev', (string) $prevStatus);
225        }
226
227        return $qb->execute();
228    }
229
230    /**
231     * @inheritdoc
232     */
233    public function addReport(string $taskId, Report $report, string $newStatus = null): int
234    {
235        $qb = $this->getQueryBuilder()
236            ->update($this->getTableName())
237            ->set(self::COLUMN_REPORT, ':report')
238            ->set(self::COLUMN_STATUS, ':status_new')
239            ->set(self::COLUMN_UPDATED_AT, ':updated_at')
240            ->andWhere(self::COLUMN_ID . ' = :id')
241            ->setParameter('id', (string) $taskId)
242            ->setParameter('report', json_encode($report))
243            ->setParameter('status_new', (string) $newStatus)
244            ->setParameter('updated_at', $this->getPersistence()->getPlatForm()->getNowExpression());
245
246        return $qb->execute();
247    }
248
249    /**
250     * @inheritdoc
251     */
252    public function getStats(TaskLogFilter $filter): TasksLogsStats
253    {
254        $qb = $this->getQueryBuilder()
255            ->from($this->getTableName());
256
257        $qb->select(
258            $this->buildCounterStatusSql(
259                TasksLogsStats::IN_PROGRESS_TASKS,
260                CategorizedStatus::getMappedStatuses(CategorizedStatus::STATUS_IN_PROGRESS)
261            ) . ', '
262                . $this->buildCounterStatusSql(
263                    TasksLogsStats::COMPLETED_TASKS,
264                    CategorizedStatus::getMappedStatuses(CategorizedStatus::STATUS_COMPLETED)
265                ) . ', '
266                . $this->buildCounterStatusSql(
267                    TasksLogsStats::FAILED_TASKS,
268                    CategorizedStatus::getMappedStatuses(CategorizedStatus::STATUS_FAILED)
269                )
270        );
271
272        $filter->applyFilters($qb);
273
274        $row = $qb->execute()->fetch();
275
276        return TasksLogsStats::buildFromArray($row);
277    }
278
279    /**
280     * @inheritdoc
281     */
282    public function archive(EntityInterface $entity): bool
283    {
284        return (bool)$this->updateStatus($entity->getId(), TaskLogInterface::STATUS_ARCHIVED);
285    }
286
287    /**
288     * @inheritdoc
289     */
290    public function cancel(EntityInterface $entity): bool
291    {
292        return (bool)$this->updateStatus($entity->getId(), TaskLogInterface::STATUS_CANCELLED);
293    }
294
295    /**
296     * @inheritdoc
297     */
298    public function archiveCollection(CollectionInterface $collection): int
299    {
300        return $this->updateCollectionStatus($collection, TaskLogInterface::STATUS_ARCHIVED);
301    }
302
303    /**
304     * @inheritdoc
305     */
306    public function cancelCollection(CollectionInterface $collection): int
307    {
308        return $this->updateCollectionStatus($collection, TaskLogInterface::STATUS_CANCELLED);
309    }
310
311    /**
312     * @inheritdoc
313     */
314    public function deleteById(string $taskId): bool
315    {
316        $this->getPersistence()->getPlatform()->beginTransaction();
317
318        try {
319            $qb = $this->getQueryBuilder()
320                ->delete($this->getTableName())
321                ->where(self::COLUMN_ID . ' = :id')
322                ->setParameter('id', $taskId);
323
324            $qb->execute();
325            $this->getPersistence()->getPlatform()->commit();
326        } catch (Exception $e) {
327            $this->getPersistence()->getPlatform()->rollBack();
328
329            return false;
330        }
331
332        return true;
333    }
334
335    /**
336     * @param string $statusColumn
337     * @param array $inStatuses
338     * @return string
339     */
340    private function buildCounterStatusSql($statusColumn, array $inStatuses)
341    {
342        if (empty($inStatuses)) {
343            return '';
344        }
345
346        $sql =  "COUNT( CASE WHEN ";
347        foreach ($inStatuses as $status) {
348            if ($status !== reset($inStatuses)) {
349                $sql .= " OR " . self::COLUMN_STATUS . " = '" . $status . "'";
350            } else {
351                $sql .= " " . self::COLUMN_STATUS . " = '" . $status . "'";
352            }
353        }
354
355        $sql .= " THEN 0 END ) AS $statusColumn";
356
357        return $sql;
358    }
359
360    /**
361     * @inheritdoc
362     */
363    public function getTableName(): string
364    {
365        return strtolower(QueueDispatcherInterface::QUEUE_PREFIX . '_' . $this->containerName);
366    }
367
368    /**
369     * @param CollectionInterface $collection
370     * @param string $status
371     * @return int Number of rows updated
372     */
373    private function updateCollectionStatus(CollectionInterface $collection, $status)
374    {
375        $this->getPersistence()->getPlatform()->beginTransaction();
376
377        try {
378            $qb = $this->getQueryBuilder()
379                ->update($this->getTableName())
380                ->set(self::COLUMN_STATUS, ':status_new')
381                ->set(self::COLUMN_UPDATED_AT, ':updated_at')
382                ->where(self::COLUMN_ID . ' IN(:id)')
383                ->setParameter('id', $collection->getIds(), Connection::PARAM_STR_ARRAY)
384                ->setParameter('status_new', (string) $status)
385                ->setParameter('updated_at', $this->getPersistence()->getPlatForm()->getNowExpression());
386
387            $exec = $qb->execute();
388            $this->getPersistence()->getPlatform()->commit();
389        } catch (Exception $e) {
390            $this->getPersistence()->getPlatform()->rollBack();
391            $this->logDebug($e->getMessage());
392
393            return false;
394        }
395
396        return $exec;
397    }
398
399    protected function getPersistence(): Persistence
400    {
401        if ($this->persistence === null) {
402            $this->persistence = $this->getServiceLocator()
403                ->get(PersistenceManager::SERVICE_ID)
404                ->getPersistenceById($this->persistenceId);
405        }
406
407        return $this->persistence;
408    }
409
410
411    protected function getQueryBuilder(): QueryBuilder
412    {
413        return $this->getPersistence()->getPlatform()->getQueryBuilder();
414    }
415}