Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
56.61% |
107 / 189 |
|
31.58% |
6 / 19 |
CRAP | |
0.00% |
0 / 1 |
RdsTaskLogBroker | |
56.61% |
107 / 189 |
|
31.58% |
6 / 19 |
128.41 | |
0.00% |
0 / 1 |
__construct | |
100.00% |
2 / 2 |
|
100.00% |
1 / 1 |
1 | |||
getTaskExecutionTimesByDateRange | |
94.12% |
32 / 34 |
|
0.00% |
0 / 1 |
4.00 | |||
createContainer | |
100.00% |
31 / 31 |
|
100.00% |
1 / 1 |
3 | |||
add | |
93.75% |
15 / 16 |
|
0.00% |
0 / 1 |
4.00 | |||
getTypes | |
100.00% |
12 / 12 |
|
100.00% |
1 / 1 |
1 | |||
__toPhpCode | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
updateStatus | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
6 | |||
addReport | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
2 | |||
getStats | |
0.00% |
0 / 19 |
|
0.00% |
0 / 1 |
2 | |||
archive | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
cancel | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
archiveCollection | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
cancelCollection | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
deleteById | |
72.73% |
8 / 11 |
|
0.00% |
0 / 1 |
2.08 | |||
buildCounterStatusSql | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
20 | |||
getTableName | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
updateCollectionStatus | |
0.00% |
0 / 16 |
|
0.00% |
0 / 1 |
6 | |||
getPersistence | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
2 | |||
getQueryBuilder | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 |
1 | <?php |
2 | |
3 | /** |
4 | * This program is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU General Public License |
6 | * as published by the Free Software Foundation; under version 2 |
7 | * of the License (non-upgradable). |
8 | * |
9 | * This program is distributed in the hope that it will be useful, |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | * GNU General Public License for more details. |
13 | * |
14 | * You should have received a copy of the GNU General Public License |
15 | * along with this program; if not, write to the Free Software |
16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
17 | * |
18 | * Copyright (c) 2020 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT); |
19 | * |
20 | */ |
21 | |
22 | declare(strict_types=1); |
23 | |
24 | namespace oat\tao\model\taskQueue\TaskLog\Broker; |
25 | |
26 | use common_persistence_sql_SchemaManager; |
27 | use common_persistence_SqlPersistence as SqlPersistence; |
28 | use common_persistence_Persistence as Persistence; |
29 | use common_Utils; |
30 | use Doctrine\DBAL\Connection; |
31 | use Doctrine\DBAL\ParameterType; |
32 | use Doctrine\DBAL\Query\QueryBuilder; |
33 | use common_report_Report as Report; |
34 | use Exception; |
35 | use DateTime; |
36 | use oat\generis\persistence\PersistenceManager; |
37 | use oat\tao\model\taskQueue\QueueDispatcherInterface; |
38 | use oat\tao\model\taskQueue\Task\CallbackTaskInterface; |
39 | use oat\tao\model\taskQueue\Task\TaskInterface; |
40 | use oat\tao\model\taskQueue\TaskLog\CategorizedStatus; |
41 | use oat\tao\model\taskQueue\TaskLog\CollectionInterface; |
42 | use oat\tao\model\taskQueue\TaskLog\Entity\EntityInterface; |
43 | use oat\tao\model\taskQueue\TaskLog\TaskLogFilter; |
44 | use oat\tao\model\taskQueue\TaskLog\TasksLogsStats; |
45 | use oat\tao\model\taskQueue\TaskLogInterface; |
46 | use oat\oatbox\log\LoggerAwareTrait; |
47 | |
48 | /** |
49 | * Storing message logs in RDS. |
50 | * |
51 | * @author Gyula Szucs <gyula@taotesting.com> |
52 | */ |
53 | class RdsTaskLogBroker extends AbstractTaskLogBroker |
54 | { |
55 | use LoggerAwareTrait; |
56 | |
57 | /** @var string */ |
58 | private $persistenceId; |
59 | |
60 | /** @var SqlPersistence */ |
61 | protected $persistence; |
62 | |
63 | public function __construct(string $persistenceId, string $containerName = null) |
64 | { |
65 | $this->persistenceId = $persistenceId; |
66 | $this->containerName = $containerName ?? self::DEFAULT_CONTAINER_NAME; |
67 | } |
68 | |
69 | public function getTaskExecutionTimesByDateRange(DateTime $from, DateTime $to): array |
70 | { |
71 | $collection = []; |
72 | |
73 | try { |
74 | $qb = $this->getQueryBuilder(); |
75 | $qb |
76 | ->select( |
77 | TaskLogBrokerInterface::COLUMN_ID, |
78 | TaskLogBrokerInterface::COLUMN_CREATED_AT, |
79 | TaskLogBrokerInterface::COLUMN_UPDATED_AT |
80 | ) |
81 | ->from($this->getTableName()) |
82 | ->where( |
83 | $qb->expr()->in( |
84 | TaskLogBrokerInterface::COLUMN_STATUS, |
85 | [ |
86 | $qb->expr()->literal(TaskLogInterface::STATUS_COMPLETED), |
87 | $qb->expr()->literal(TaskLogInterface::STATUS_ARCHIVED), |
88 | ] |
89 | ), |
90 | $qb->expr()->gte(TaskLogBrokerInterface::COLUMN_CREATED_AT, ':from'), |
91 | $qb->expr()->lte(TaskLogBrokerInterface::COLUMN_CREATED_AT, ':to') |
92 | ) |
93 | ->setParameters([ |
94 | 'from' => $from->format('Y-m-d H:i:s'), |
95 | 'to' => $to->format('Y-m-d H:i:s') |
96 | ]); |
97 | |
98 | $results = $qb->execute(); |
99 | |
100 | while (($row = $results->fetchAssociative()) !== false) { |
101 | if (empty($row[TaskLogBrokerInterface::COLUMN_UPDATED_AT])) { |
102 | continue; |
103 | } |
104 | $collection[$row[TaskLogBrokerInterface::COLUMN_ID]] = |
105 | strtotime($row[TaskLogBrokerInterface::COLUMN_UPDATED_AT]) |
106 | - strtotime($row[TaskLogBrokerInterface::COLUMN_CREATED_AT]); |
107 | } |
108 | } catch (Exception $exception) { |
109 | $this->logError('Searching for task logs failed with MSG: ' . $exception->getMessage()); |
110 | } |
111 | |
112 | return $collection; |
113 | } |
114 | |
115 | /** |
116 | * @inheritdoc |
117 | */ |
118 | public function createContainer(): void |
119 | { |
120 | /** @var common_persistence_sql_SchemaManager $schemaManager */ |
121 | $schemaManager = $this->getPersistence()->getSchemaManager(); |
122 | |
123 | $fromSchema = $schemaManager->createSchema(); |
124 | $toSchema = clone $fromSchema; |
125 | |
126 | // if our table does not exist, let's create it |
127 | if (false === $fromSchema->hasTable($this->getTableName())) { |
128 | $table = $toSchema->createTable($this->getTableName()); |
129 | $table->addOption('engine', 'InnoDB'); |
130 | $table->addColumn(self::COLUMN_ID, 'string', ["notnull" => true, "length" => 255]); |
131 | $table->addColumn( |
132 | self::COLUMN_PARENT_ID, |
133 | 'string', |
134 | ["notnull" => false, "length" => 255, "default" => null] |
135 | ); |
136 | $table->addColumn(self::COLUMN_TASK_NAME, 'string', ["notnull" => true, "length" => 255]); |
137 | $table->addColumn(self::COLUMN_PARAMETERS, 'text', ["notnull" => false, "default" => null]); |
138 | $table->addColumn(self::COLUMN_LABEL, 'string', ["notnull" => false, "length" => 255]); |
139 | $table->addColumn(self::COLUMN_STATUS, 'string', ["notnull" => true, "length" => 50]); |
140 | $table->addColumn(self::COLUMN_MASTER_STATUS, 'boolean', ["default" => 0]); |
141 | $table->addColumn(self::COLUMN_OWNER, 'string', ["notnull" => false, "length" => 255, "default" => null]); |
142 | $table->addColumn(self::COLUMN_REPORT, 'text', ["notnull" => false, "default" => null]); |
143 | $table->addColumn(self::COLUMN_CREATED_AT, 'datetime', ['notnull' => true]); |
144 | $table->addColumn(self::COLUMN_UPDATED_AT, 'datetime', ['notnull' => false]); |
145 | $table->setPrimaryKey([self::COLUMN_ID]); |
146 | $table->addIndex( |
147 | [self::COLUMN_TASK_NAME, self::COLUMN_OWNER], |
148 | $this->getTableName() . 'IDX_task_name_owner' |
149 | ); |
150 | $table->addIndex([self::COLUMN_STATUS], $this->getTableName() . 'IDX_status'); |
151 | $table->addIndex([self::COLUMN_CREATED_AT], $this->getTableName() . 'IDX_created_at'); |
152 | |
153 | $queries = $this->getPersistence()->getPlatForm()->getMigrateSchemaSql($fromSchema, $toSchema); |
154 | foreach ($queries as $query) { |
155 | $this->getPersistence()->exec($query); |
156 | } |
157 | } |
158 | } |
159 | |
160 | /** |
161 | * @inheritdoc |
162 | */ |
163 | public function add(TaskInterface $task, string $status, string $label = null): void |
164 | { |
165 | $this->getPersistence()->insert($this->getTableName(), [ |
166 | self::COLUMN_ID => (string) $task->getId(), |
167 | self::COLUMN_PARENT_ID => $task->getParentId() ? (string) $task->getParentId() : null, |
168 | self::COLUMN_TASK_NAME => $task instanceof CallbackTaskInterface && is_object($task->getCallable()) |
169 | ? get_class($task->getCallable()) |
170 | : get_class($task), |
171 | self::COLUMN_PARAMETERS => json_encode($task->getParameters()), |
172 | self::COLUMN_LABEL => (string) $label, |
173 | self::COLUMN_STATUS => $status, |
174 | self::COLUMN_OWNER => (string) $task->getOwner(), |
175 | self::COLUMN_CREATED_AT => $task->getCreatedAt()->format( |
176 | $this->getPersistence()->getPlatForm()->getDateTimeFormatString() |
177 | ), |
178 | self::COLUMN_UPDATED_AT => $this->getPersistence()->getPlatForm()->getNowExpression(), |
179 | self::COLUMN_MASTER_STATUS => $task->isMasterStatus(), |
180 | ], $this->getTypes()); |
181 | } |
182 | |
183 | protected function getTypes(): array |
184 | { |
185 | return [ |
186 | ParameterType::STRING, |
187 | ParameterType::STRING, |
188 | ParameterType::STRING, |
189 | ParameterType::STRING, |
190 | ParameterType::STRING, |
191 | ParameterType::STRING, |
192 | ParameterType::STRING, |
193 | ParameterType::STRING, |
194 | ParameterType::STRING, |
195 | ParameterType::BOOLEAN, |
196 | ]; |
197 | } |
198 | |
199 | public function __toPhpCode() |
200 | { |
201 | return 'new ' . get_called_class() . '(' |
202 | . common_Utils::toHumanReadablePhpString($this->persistenceId) |
203 | . ', ' |
204 | . common_Utils::toHumanReadablePhpString($this->containerName) |
205 | . ')'; |
206 | } |
207 | |
208 | /** |
209 | * @inheritdoc |
210 | */ |
211 | public function updateStatus(string $taskId, string $newStatus, string $prevStatus = null): int |
212 | { |
213 | $qb = $this->getQueryBuilder() |
214 | ->update($this->getTableName()) |
215 | ->set(self::COLUMN_STATUS, ':status_new') |
216 | ->set(self::COLUMN_UPDATED_AT, ':updated_at') |
217 | ->where(self::COLUMN_ID . ' = :id') |
218 | ->setParameter('id', (string) $taskId) |
219 | ->setParameter('status_new', (string) $newStatus) |
220 | ->setParameter('updated_at', $this->getPersistence()->getPlatForm()->getNowExpression()); |
221 | |
222 | if ($prevStatus) { |
223 | $qb->andWhere(self::COLUMN_STATUS . ' = :status_prev') |
224 | ->setParameter('status_prev', (string) $prevStatus); |
225 | } |
226 | |
227 | return $qb->execute(); |
228 | } |
229 | |
230 | /** |
231 | * @inheritdoc |
232 | */ |
233 | public function addReport(string $taskId, Report $report, string $newStatus = null): int |
234 | { |
235 | $qb = $this->getQueryBuilder() |
236 | ->update($this->getTableName()) |
237 | ->set(self::COLUMN_REPORT, ':report') |
238 | ->set(self::COLUMN_STATUS, ':status_new') |
239 | ->set(self::COLUMN_UPDATED_AT, ':updated_at') |
240 | ->andWhere(self::COLUMN_ID . ' = :id') |
241 | ->setParameter('id', (string) $taskId) |
242 | ->setParameter('report', json_encode($report)) |
243 | ->setParameter('status_new', (string) $newStatus) |
244 | ->setParameter('updated_at', $this->getPersistence()->getPlatForm()->getNowExpression()); |
245 | |
246 | return $qb->execute(); |
247 | } |
248 | |
249 | /** |
250 | * @inheritdoc |
251 | */ |
252 | public function getStats(TaskLogFilter $filter): TasksLogsStats |
253 | { |
254 | $qb = $this->getQueryBuilder() |
255 | ->from($this->getTableName()); |
256 | |
257 | $qb->select( |
258 | $this->buildCounterStatusSql( |
259 | TasksLogsStats::IN_PROGRESS_TASKS, |
260 | CategorizedStatus::getMappedStatuses(CategorizedStatus::STATUS_IN_PROGRESS) |
261 | ) . ', ' |
262 | . $this->buildCounterStatusSql( |
263 | TasksLogsStats::COMPLETED_TASKS, |
264 | CategorizedStatus::getMappedStatuses(CategorizedStatus::STATUS_COMPLETED) |
265 | ) . ', ' |
266 | . $this->buildCounterStatusSql( |
267 | TasksLogsStats::FAILED_TASKS, |
268 | CategorizedStatus::getMappedStatuses(CategorizedStatus::STATUS_FAILED) |
269 | ) |
270 | ); |
271 | |
272 | $filter->applyFilters($qb); |
273 | |
274 | $row = $qb->execute()->fetch(); |
275 | |
276 | return TasksLogsStats::buildFromArray($row); |
277 | } |
278 | |
279 | /** |
280 | * @inheritdoc |
281 | */ |
282 | public function archive(EntityInterface $entity): bool |
283 | { |
284 | return (bool)$this->updateStatus($entity->getId(), TaskLogInterface::STATUS_ARCHIVED); |
285 | } |
286 | |
287 | /** |
288 | * @inheritdoc |
289 | */ |
290 | public function cancel(EntityInterface $entity): bool |
291 | { |
292 | return (bool)$this->updateStatus($entity->getId(), TaskLogInterface::STATUS_CANCELLED); |
293 | } |
294 | |
295 | /** |
296 | * @inheritdoc |
297 | */ |
298 | public function archiveCollection(CollectionInterface $collection): int |
299 | { |
300 | return $this->updateCollectionStatus($collection, TaskLogInterface::STATUS_ARCHIVED); |
301 | } |
302 | |
303 | /** |
304 | * @inheritdoc |
305 | */ |
306 | public function cancelCollection(CollectionInterface $collection): int |
307 | { |
308 | return $this->updateCollectionStatus($collection, TaskLogInterface::STATUS_CANCELLED); |
309 | } |
310 | |
311 | /** |
312 | * @inheritdoc |
313 | */ |
314 | public function deleteById(string $taskId): bool |
315 | { |
316 | $this->getPersistence()->getPlatform()->beginTransaction(); |
317 | |
318 | try { |
319 | $qb = $this->getQueryBuilder() |
320 | ->delete($this->getTableName()) |
321 | ->where(self::COLUMN_ID . ' = :id') |
322 | ->setParameter('id', $taskId); |
323 | |
324 | $qb->execute(); |
325 | $this->getPersistence()->getPlatform()->commit(); |
326 | } catch (Exception $e) { |
327 | $this->getPersistence()->getPlatform()->rollBack(); |
328 | |
329 | return false; |
330 | } |
331 | |
332 | return true; |
333 | } |
334 | |
335 | /** |
336 | * @param string $statusColumn |
337 | * @param array $inStatuses |
338 | * @return string |
339 | */ |
340 | private function buildCounterStatusSql($statusColumn, array $inStatuses) |
341 | { |
342 | if (empty($inStatuses)) { |
343 | return ''; |
344 | } |
345 | |
346 | $sql = "COUNT( CASE WHEN "; |
347 | foreach ($inStatuses as $status) { |
348 | if ($status !== reset($inStatuses)) { |
349 | $sql .= " OR " . self::COLUMN_STATUS . " = '" . $status . "'"; |
350 | } else { |
351 | $sql .= " " . self::COLUMN_STATUS . " = '" . $status . "'"; |
352 | } |
353 | } |
354 | |
355 | $sql .= " THEN 0 END ) AS $statusColumn"; |
356 | |
357 | return $sql; |
358 | } |
359 | |
360 | /** |
361 | * @inheritdoc |
362 | */ |
363 | public function getTableName(): string |
364 | { |
365 | return strtolower(QueueDispatcherInterface::QUEUE_PREFIX . '_' . $this->containerName); |
366 | } |
367 | |
368 | /** |
369 | * @param CollectionInterface $collection |
370 | * @param string $status |
371 | * @return int Number of rows updated |
372 | */ |
373 | private function updateCollectionStatus(CollectionInterface $collection, $status) |
374 | { |
375 | $this->getPersistence()->getPlatform()->beginTransaction(); |
376 | |
377 | try { |
378 | $qb = $this->getQueryBuilder() |
379 | ->update($this->getTableName()) |
380 | ->set(self::COLUMN_STATUS, ':status_new') |
381 | ->set(self::COLUMN_UPDATED_AT, ':updated_at') |
382 | ->where(self::COLUMN_ID . ' IN(:id)') |
383 | ->setParameter('id', $collection->getIds(), Connection::PARAM_STR_ARRAY) |
384 | ->setParameter('status_new', (string) $status) |
385 | ->setParameter('updated_at', $this->getPersistence()->getPlatForm()->getNowExpression()); |
386 | |
387 | $exec = $qb->execute(); |
388 | $this->getPersistence()->getPlatform()->commit(); |
389 | } catch (Exception $e) { |
390 | $this->getPersistence()->getPlatform()->rollBack(); |
391 | $this->logDebug($e->getMessage()); |
392 | |
393 | return false; |
394 | } |
395 | |
396 | return $exec; |
397 | } |
398 | |
399 | protected function getPersistence(): Persistence |
400 | { |
401 | if ($this->persistence === null) { |
402 | $this->persistence = $this->getServiceLocator() |
403 | ->get(PersistenceManager::SERVICE_ID) |
404 | ->getPersistenceById($this->persistenceId); |
405 | } |
406 | |
407 | return $this->persistence; |
408 | } |
409 | |
410 | |
411 | protected function getQueryBuilder(): QueryBuilder |
412 | { |
413 | return $this->getPersistence()->getPlatform()->getQueryBuilder(); |
414 | } |
415 | } |