Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
2.54% |
3 / 118 |
|
0.00% |
0 / 13 |
CRAP | |
0.00% |
0 / 1 |
RdsQueueBroker | |
2.54% |
3 / 118 |
|
0.00% |
0 / 13 |
701.80 | |
0.00% |
0 / 1 |
__construct | |
75.00% |
3 / 4 |
|
0.00% |
0 / 1 |
2.06 | |||
__toPhpCode | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
getPersistence | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getTableName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
createQueue | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
30 | |||
push | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
2 | |||
doPop | |
0.00% |
0 / 27 |
|
0.00% |
0 / 1 |
30 | |||
delete | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
2 | |||
doDelete | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
6 | |||
getTaskByTaskLogId | |
0.00% |
0 / 21 |
|
0.00% |
0 / 1 |
12 | |||
changeTaskVisibility | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
2 | |||
count | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
6 | |||
getQueryBuilder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | /** |
4 | * This program is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU General Public License |
6 | * as published by the Free Software Foundation; under version 2 |
7 | * of the License (non-upgradable). |
8 | * |
9 | * This program is distributed in the hope that it will be useful, |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | * GNU General Public License for more details. |
13 | * |
14 | * You should have received a copy of the GNU General Public License |
15 | * along with this program; if not, write to the Free Software |
16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
17 | * |
18 | * Copyright (c) 2017-2021 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT); |
19 | * |
20 | */ |
21 | |
22 | namespace oat\taoTaskQueue\model\QueueBroker; |
23 | |
24 | use Doctrine\DBAL\FetchMode; |
25 | use Doctrine\DBAL\Query\QueryBuilder; |
26 | use Doctrine\DBAL\Schema\SchemaException; |
27 | use Doctrine\DBAL\Schema\Schema; |
28 | use Doctrine\DBAL\Schema\AbstractSchemaManager; |
29 | use oat\tao\model\taskQueue\Queue\Broker\AbstractQueueBroker; |
30 | use oat\tao\model\taskQueue\Task\TaskInterface; |
31 | use oat\taoTaskQueue\model\Task\CallbackTaskDecorator; |
32 | |
33 | /** |
34 | * Storing messages/tasks in DB. |
35 | * |
36 | * @author Gyula Szucs <gyula@taotesting.com> |
37 | */ |
38 | class RdsQueueBroker extends AbstractQueueBroker |
39 | { |
40 | public const ID = 'rds'; |
41 | |
42 | private $persistenceId; |
43 | |
44 | /** |
45 | * @var \common_persistence_SqlPersistence |
46 | */ |
47 | protected $persistence; |
48 | |
49 | /** |
50 | * RdsQueueBroker constructor. |
51 | * |
52 | * @param string $persistenceId |
53 | * @param int $receiveTasks |
54 | */ |
55 | public function __construct($persistenceId, $receiveTasks = 1) |
56 | { |
57 | parent::__construct($receiveTasks); |
58 | |
59 | if (empty($persistenceId)) { |
60 | throw new \InvalidArgumentException("Persistence id needs to be set for " . __CLASS__); |
61 | } |
62 | |
63 | $this->persistenceId = $persistenceId; |
64 | } |
65 | |
66 | public function __toPhpCode() |
67 | { |
68 | return 'new ' . get_called_class() . '(' |
69 | . \common_Utils::toHumanReadablePhpString($this->persistenceId) |
70 | . ', ' |
71 | . \common_Utils::toHumanReadablePhpString($this->getNumberOfTasksToReceive()) |
72 | . ')'; |
73 | } |
74 | |
75 | /** |
76 | * @return \common_persistence_SqlPersistence |
77 | */ |
78 | protected function getPersistence() |
79 | { |
80 | if (is_null($this->persistence)) { |
81 | $this->persistence = $this->getServiceLocator() |
82 | ->get(\common_persistence_Manager::SERVICE_ID) |
83 | ->getPersistenceById($this->persistenceId); |
84 | } |
85 | |
86 | return $this->persistence; |
87 | } |
88 | |
89 | /** |
90 | * @return string |
91 | */ |
92 | protected function getTableName() |
93 | { |
94 | return strtolower($this->getQueueNameWithPrefix()); |
95 | } |
96 | |
97 | /** |
98 | * Note: this method can be run multiple times because only the migrate queries (result of getMigrateSchemaSql) |
99 | * will be run. |
100 | * |
101 | * @inheritdoc |
102 | */ |
103 | public function createQueue() |
104 | { |
105 | $persistence = $this->getPersistence(); |
106 | /** @var AbstractSchemaManager $schemaManager */ |
107 | $schemaManager = $persistence->getDriver()->getSchemaManager(); |
108 | |
109 | /** @var Schema $schema */ |
110 | $schema = $schemaManager->createSchema(); |
111 | $fromSchema = clone $schema; |
112 | try { |
113 | if (in_array($this->getTableName(), $schemaManager->getTables())) { |
114 | $schema->dropTable($this->getTableName()); |
115 | } |
116 | $table = $schema->createTable($this->getTableName()); |
117 | $table->addOption('engine', 'InnoDB'); |
118 | $table->addColumn('id', 'integer', ["autoincrement" => true, "notnull" => true, "unsigned" => true]); |
119 | $table->addColumn('message', 'text', ["notnull" => true]); |
120 | $table->addColumn('visible', 'boolean', ["default" => 1]); |
121 | $table->addColumn('created_at', 'datetime', ['notnull' => true]); |
122 | $table->setPrimaryKey(['id']); |
123 | $table->addIndex(['created_at', 'visible'], 'IDX_created_at_visible_' . $this->getQueueName()); |
124 | } catch (SchemaException $e) { |
125 | $this->logDebug('Schema of ' . $this->getTableName() . ' table already up to date.'); |
126 | } |
127 | |
128 | $queries = $persistence->getPlatForm()->getMigrateSchemaSql($fromSchema, $schema); |
129 | |
130 | foreach ($queries as $query) { |
131 | $persistence->exec($query); |
132 | } |
133 | |
134 | if ($queries) { |
135 | $this->logDebug('Queue ' . $this->getTableName() . ' created/updated in RDS.'); |
136 | } |
137 | } |
138 | |
139 | /** |
140 | * Insert a new task into the queue table. |
141 | * |
142 | * @param TaskInterface $task |
143 | * @return bool |
144 | */ |
145 | public function push(TaskInterface $task) |
146 | { |
147 | return (bool) $this->getPersistence()->insert($this->getTableName(), [ |
148 | 'message' => $this->serializeTask($task), |
149 | 'created_at' => $this->getPersistence()->getPlatForm()->getNowExpression() |
150 | ]); |
151 | } |
152 | |
153 | /** |
154 | * Does the DBAL specific pop mechanism. |
155 | */ |
156 | protected function doPop() |
157 | { |
158 | $this->getPersistence()->getPlatform()->beginTransaction(); |
159 | |
160 | $logContext = [ |
161 | 'Queue' => $this->getQueueNameWithPrefix() |
162 | ]; |
163 | |
164 | try { |
165 | $qb = $this->getQueryBuilder() |
166 | ->select('id, message') |
167 | ->from($this->getTableName()) |
168 | ->andWhere('visible = :visible') |
169 | ->orderBy('created_at') |
170 | ->setMaxResults($this->getNumberOfTasksToReceive()); |
171 | |
172 | /** |
173 | * SELECT ... FOR UPDATE is used for locking |
174 | * |
175 | * @see https://dev.mysql.com/doc/refman/5.6/en/innodb-locking-reads.html |
176 | */ |
177 | $sql = $qb->getSQL() . ' ' . $this->getPersistence()->getPlatForm()->getWriteLockSQL(); |
178 | |
179 | if ($dbResult = $this->getPersistence()->query($sql, ['visible' => 1])->fetchAll(\PDO::FETCH_ASSOC)) { |
180 | // set the received messages to invisible for other workers |
181 | $qb = $this->getQueryBuilder() |
182 | ->update($this->getTableName()) |
183 | ->set('visible', ':visible') |
184 | ->where('id IN (' . implode(',', array_column($dbResult, 'id')) . ')') |
185 | ->setParameter('visible', 0); |
186 | |
187 | $qb->execute(); |
188 | |
189 | foreach ($dbResult as $row) { |
190 | if ($task = $this->unserializeTask($row['message'], $row['id'], $logContext)) { |
191 | $task->setMetadata('RdsMessageId', $row['id']); |
192 | $this->pushPreFetchedMessage($task); |
193 | } |
194 | } |
195 | } else { |
196 | $this->logDebug('No task in the queue.', $logContext); |
197 | } |
198 | |
199 | $this->getPersistence()->getPlatform()->commit(); |
200 | } catch (\Exception $e) { |
201 | $this->getPersistence()->getPlatform()->rollBack(); |
202 | $this->logError('Popping tasks failed with MSG: ' . $e->getMessage(), $logContext); |
203 | } |
204 | } |
205 | |
206 | /** |
207 | * Delete the message after being processed by the worker. |
208 | * |
209 | * @param TaskInterface $task |
210 | */ |
211 | public function delete(TaskInterface $task) |
212 | { |
213 | $this->doDelete($task->getMetadata('RdsMessageId'), [ |
214 | 'InternalMessageId' => $task->getId(), |
215 | 'RdsMessageId' => $task->getMetadata('RdsMessageId') |
216 | ]); |
217 | } |
218 | |
219 | /** |
220 | * @param string $id |
221 | * @param array $logContext |
222 | * @return int |
223 | */ |
224 | protected function doDelete($id, array $logContext = []) |
225 | { |
226 | try { |
227 | $this->getQueryBuilder() |
228 | ->delete($this->getTableName()) |
229 | ->where('id = :id') |
230 | ->andWhere('visible = :visible') |
231 | ->setParameter('id', (int) $id) |
232 | ->setParameter('visible', 0) |
233 | ->execute(); |
234 | } catch (\Exception $e) { |
235 | $this->logError('Deleting task failed with MSG: ' . $e->getMessage(), $logContext); |
236 | } |
237 | } |
238 | |
239 | /** |
240 | * @TODO Make queue broker open/closed: https://oat-sa.atlassian.net/browse/ADF-556 |
241 | */ |
242 | public function getTaskByTaskLogId(string $taskLogId): ?CallbackTaskDecorator |
243 | { |
244 | $logId = substr($taskLogId, strpos($taskLogId, '#')); |
245 | |
246 | $row = $this->getQueryBuilder() |
247 | ->select('id, message, visible, created_at') |
248 | ->from($this->getTableName()) |
249 | ->andWhere('message LIKE :taskLogId') |
250 | ->setParameter('taskLogId', "%$logId%") |
251 | ->setMaxResults(1) |
252 | ->execute() |
253 | ->fetch(FetchMode::ASSOCIATIVE); |
254 | |
255 | if (!$row) { |
256 | return null; |
257 | } |
258 | |
259 | $task = $this->unserializeTask( |
260 | $row['message'], |
261 | $row['id'], |
262 | [ |
263 | 'Queue' => $this->getQueueNameWithPrefix() |
264 | ] |
265 | ); |
266 | |
267 | if (!$task) { |
268 | return null; |
269 | } |
270 | |
271 | return new CallbackTaskDecorator($task, $row['id']); |
272 | } |
273 | |
274 | /** |
275 | * @TODO Make queue broker open/closed: https://oat-sa.atlassian.net/browse/ADF-556 |
276 | */ |
277 | public function changeTaskVisibility(string $taskId, bool $visible): void |
278 | { |
279 | $this->getQueryBuilder() |
280 | ->update($this->getTableName()) |
281 | ->set('visible', ':visible') |
282 | ->where('id = :id') |
283 | ->setParameter('visible', (int)$visible) |
284 | ->setParameter('id', $taskId) |
285 | ->execute(); |
286 | } |
287 | |
288 | public function count(): int |
289 | { |
290 | try { |
291 | $qb = $this->getQueryBuilder() |
292 | ->select('COUNT(id)') |
293 | ->from($this->getTableName()) |
294 | ->andWhere('visible = :visible') |
295 | ->setParameter('visible', 1); |
296 | |
297 | return (int) $qb->execute()->fetchColumn(); |
298 | } catch (\Exception $e) { |
299 | $this->logError('Counting tasks failed with MSG: ' . $e->getMessage()); |
300 | } |
301 | |
302 | return 0; |
303 | } |
304 | |
305 | /** |
306 | * @return QueryBuilder |
307 | */ |
308 | private function getQueryBuilder() |
309 | { |
310 | return $this->getPersistence()->getPlatform()->getQueryBuilder(); |
311 | } |
312 | } |