Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from contextlib import contextmanager 

17import logging 

18import os 

19import select 

20from threading import Thread 

21import time 

22from datetime import datetime 

23from tempfile import NamedTemporaryFile 

24from itertools import chain, combinations, product 

25from typing import List, Dict, Tuple, Iterable 

26 

27from alembic import command 

28from alembic.config import Config 

29from sqlalchemy import and_, create_engine, event, func, or_, text, union, literal_column 

30from sqlalchemy.orm.session import sessionmaker 

31 

32from buildgrid._protos.google.longrunning import operations_pb2 

33from ...._enums import LeaseState, MetricCategories, OperationStage 

34from ....settings import MAX_JOB_BLOCK_TIME 

35from ....utils import JobState, hash_from_dict, convert_values_to_sorted_lists 

36from ..interface import DataStoreInterface 

37from .models import digest_to_string, Job, Lease, Operation 

38 

39from buildgrid._exceptions import DatabaseError 

40 

41Session = sessionmaker() 

42 

43 

44def sqlite_on_connect(conn, record): 

45 conn.execute("PRAGMA journal_mode=WAL") 

46 conn.execute("PRAGMA synchronous=NORMAL") 

47 

48 

49class SQLDataStore(DataStoreInterface): 

50 

51 def __init__(self, storage, *, connection_string=None, automigrate=False, 

52 connection_timeout=5, poll_interval=1, **kwargs): 

53 super().__init__() 

54 self.__logger = logging.getLogger(__name__) 

55 self.__logger.info("Creating SQL scheduler with: " 

56 f"automigrate=[{automigrate}], connection_timeout=[{connection_timeout}] " 

57 f"poll_interval=[{poll_interval}], kwargs=[{kwargs}]") 

58 

59 self.storage = storage 

60 self.response_cache = {} 

61 self.connection_timeout = connection_timeout 

62 self.poll_interval = poll_interval 

63 self.watcher = Thread(name="JobWatcher", target=self.wait_for_job_updates, daemon=True) 

64 self.watcher_keep_running = True 

65 

66 # Set-up temporary SQLite Database when connection string is not specified 

67 if not connection_string: 

68 tmpdbfile = NamedTemporaryFile(prefix='bgd-', suffix='.db') 

69 self._tmpdbfile = tmpdbfile # Make sure to keep this tempfile for the lifetime of this object 

70 self.__logger.warn("No connection string specified for the DataStore, " 

71 f"will use SQLite with tempfile: [{tmpdbfile.name}]") 

72 automigrate = True # since this is a temporary database, we always need to create it 

73 connection_string = f"sqlite:///{tmpdbfile.name}" 

74 

75 self._create_sqlalchemy_engine(connection_string, automigrate, connection_timeout, **kwargs) 

76 

77 # Make a test query against the database to ensure the connection is valid 

78 with self.session(reraise=True) as session: 

79 session.query(Job).first() 

80 

81 self.watcher.start() 

82 

83 self.capabilities_cache = {} 

84 

85 def _create_sqlalchemy_engine(self, connection_string, automigrate, connection_timeout, **kwargs): 

86 self.automigrate = automigrate 

87 

88 # Disallow sqlite in-memory because multi-threaded access to it is 

89 # complex and potentially problematic at best 

90 # ref: https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#threading-pooling-behavior 

91 if self._is_sqlite_inmemory_connection_string(connection_string): 

92 raise ValueError( 

93 f"Cannot use SQLite in-memory with BuildGrid (connection_string=[{connection_string}]). " 

94 "Use a file or leave the connection_string empty for a tempfile.") 

95 

96 if connection_timeout is not None: 

97 if "connect_args" not in kwargs: 

98 kwargs["connect_args"] = {} 

99 if self._is_sqlite_connection_string(connection_string): 

100 kwargs["connect_args"]["timeout"] = connection_timeout 

101 else: 

102 kwargs["connect_args"]["connect_timeout"] = connection_timeout 

103 

104 # Only pass the (known) kwargs that have been explicitly set by the user 

105 available_options = set(['pool_size', 'max_overflow', 'pool_timeout', 'connect_args']) 

106 kwargs_keys = set(kwargs.keys()) 

107 if not kwargs_keys.issubset(available_options): 

108 unknown_options = kwargs_keys - available_options 

109 raise TypeError(f"Unknown keyword arguments: [{unknown_options}]") 

110 

111 self.__logger.debug(f"SQLAlchemy additional kwargs: [{kwargs}]") 

112 

113 self.engine = create_engine(connection_string, echo=False, **kwargs) 

114 Session.configure(bind=self.engine) 

115 

116 if self.engine.dialect.name == "sqlite": 

117 event.listen(self.engine, "connect", sqlite_on_connect) 

118 

119 if self.automigrate: 

120 self._create_or_migrate_db(connection_string) 

121 

122 def _is_sqlite_connection_string(self, connection_string): 

123 if connection_string: 

124 return connection_string.startswith("sqlite") 

125 return False 

126 

127 def _is_sqlite_inmemory_connection_string(self, full_connection_string): 

128 if self._is_sqlite_connection_string(full_connection_string): 

129 # Valid connection_strings for in-memory SQLite which we don't support could look like: 

130 # "sqlite:///file:memdb1?option=value&cache=shared&mode=memory", 

131 # "sqlite:///file:memdb1?mode=memory&cache=shared", 

132 # "sqlite:///file:memdb1?cache=shared&mode=memory", 

133 # "sqlite:///file::memory:?cache=shared", 

134 # "sqlite:///file::memory:", 

135 # "sqlite:///:memory:", 

136 # "sqlite:///", 

137 # "sqlite://" 

138 # ref: https://www.sqlite.org/inmemorydb.html 

139 # Note that a user can also specify drivers, so prefix could become 'sqlite+driver:///' 

140 connection_string = full_connection_string 

141 

142 uri_split_index = connection_string.find("?") 

143 if uri_split_index != -1: 

144 connection_string = connection_string[0:uri_split_index] 

145 

146 if connection_string.endswith((":memory:", ":///", "://")): 

147 return True 

148 elif uri_split_index != -1: 

149 opts = full_connection_string[uri_split_index + 1:].split("&") 

150 if "mode=memory" in opts: 

151 return True 

152 

153 return False 

154 

155 def __repr__(self): 

156 return f"SQL data store interface for `{repr(self.engine.url)}`" 

157 

158 def activate_monitoring(self): 

159 # Don't do anything. This function needs to exist but there's no 

160 # need to actually toggle monitoring in this implementation. 

161 pass 

162 

163 def deactivate_monitoring(self): 

164 # Don't do anything. This function needs to exist but there's no 

165 # need to actually toggle monitoring in this implementation. 

166 pass 

167 

168 def _create_or_migrate_db(self, connection_string): 

169 self.__logger.warn("Will attempt migration to latest version if needed.") 

170 

171 config = Config() 

172 config.set_main_option("script_location", os.path.join(os.path.dirname(__file__), "alembic")) 

173 

174 with self.engine.begin() as connection: 

175 config.attributes['connection'] = connection 

176 command.upgrade(config, "head") 

177 

178 @contextmanager 

179 def session(self, *, sqlite_lock_immediately=False, reraise=False): 

180 # Try to obtain a session 

181 try: 

182 session = Session() 

183 if sqlite_lock_immediately and session.bind.name == "sqlite": 

184 session.execute("BEGIN IMMEDIATE") 

185 except Exception as e: 

186 self.__logger.error("Unable to obtain a database session.", exc_info=True) 

187 raise DatabaseError("Unable to obtain a database session.") from e 

188 

189 # Yield the session and catch exceptions that occur while using it 

190 # to roll-back if needed 

191 try: 

192 yield session 

193 session.commit() 

194 except: 

195 session.rollback() 

196 self.__logger.error("Error committing database session. Rolling back.", exc_info=True) 

197 if reraise: 

198 raise 

199 finally: 

200 session.close() 

201 

202 def _get_job(self, job_name, session, with_for_update=False): 

203 jobs = session.query(Job) 

204 if with_for_update: 

205 jobs = jobs.with_for_update() 

206 jobs = jobs.filter_by(name=job_name) 

207 return jobs.first() 

208 

209 def _check_job_timeout(self, job_internal, *, max_execution_timeout=None): 

210 """ Do a lazy check of maximum allowed job timeouts when clients try to retrieve 

211 an existing job. 

212 Cancel the job and related operations/leases, if we detect they have 

213 exceeded timeouts on access. 

214 

215 Returns the `buildgrid.server.Job` object, possibly updated with `cancelled=True`. 

216 """ 

217 if job_internal and max_execution_timeout and job_internal.worker_start_timestamp_as_datetime: 

218 if job_internal.operation_stage == OperationStage.EXECUTING: 

219 executing_duration = datetime.utcnow() - job_internal.worker_start_timestamp_as_datetime 

220 if executing_duration.total_seconds() >= max_execution_timeout: 

221 self.__logger.warning(f"Job=[{job_internal}] has been executing for " 

222 f"executing_duration=[{executing_duration}]. " 

223 f"max_execution_timeout=[{max_execution_timeout}] " 

224 "Cancelling.") 

225 job_internal.cancel_all_operations(data_store=self) 

226 self.__logger.info(f"Job=[{job_internal}] has been cancelled.") 

227 return job_internal 

228 

229 def get_job_by_action(self, action_digest, *, max_execution_timeout=None): 

230 try: 

231 with self.session() as session: 

232 jobs = session.query(Job).filter_by(action_digest=digest_to_string(action_digest)) 

233 jobs = jobs.filter(Job.stage != OperationStage.COMPLETED.value) 

234 job = jobs.first() 

235 if job: 

236 internal_job = job.to_internal_job(self) 

237 return self._check_job_timeout(internal_job, max_execution_timeout=max_execution_timeout) 

238 except DatabaseError: 

239 raise 

240 return None 

241 

242 def get_job_by_name(self, name, *, max_execution_timeout=None): 

243 try: 

244 with self.session() as session: 

245 job = self._get_job(name, session) 

246 if job: 

247 internal_job = job.to_internal_job(self) 

248 return self._check_job_timeout(internal_job, max_execution_timeout=max_execution_timeout) 

249 except DatabaseError: 

250 raise 

251 return None 

252 

253 def get_job_by_operation(self, operation_name, *, max_execution_timeout=None): 

254 with self.session() as session: 

255 operation = self._get_operation(operation_name, session) 

256 if not operation: 

257 return None 

258 job = operation.job 

259 if job: 

260 internal_job = job.to_internal_job(self) 

261 return self._check_job_timeout(internal_job, max_execution_timeout=max_execution_timeout) 

262 

263 def get_all_jobs(self): 

264 with self.session() as session: 

265 jobs = session.query(Job).filter(Job.stage != OperationStage.COMPLETED.value) 

266 return [j.to_internal_job(self) for j in jobs] 

267 

268 def get_jobs_by_stage(self, operation_stage): 

269 with self.session() as session: 

270 jobs = session.query(Job).filter(Job.stage == operation_stage.value) 

271 return [j.to_internal_job(self, no_result=True) for j in jobs] 

272 

273 def create_job(self, job): 

274 with self.session() as session: 

275 if self._get_job(job.name, session) is None: 

276 # Convert requirements values to sorted lists to make them json-serializable 

277 platform_requirements = job.platform_requirements 

278 convert_values_to_sorted_lists(job.platform_requirements) 

279 # Serialize the requirements 

280 platform_requirements_hash = hash_from_dict(platform_requirements) 

281 

282 session.add(Job( 

283 name=job.name, 

284 action_digest=digest_to_string(job.action_digest), 

285 do_not_cache=job.do_not_cache, 

286 priority=job.priority, 

287 operations=[], 

288 platform_requirements=platform_requirements_hash, 

289 stage=job.operation_stage.value, 

290 queued_timestamp=job.queued_timestamp_as_datetime, 

291 queued_time_duration=job.queued_time_duration.seconds, 

292 worker_start_timestamp=job.worker_start_timestamp_as_datetime, 

293 worker_completed_timestamp=job.worker_completed_timestamp_as_datetime 

294 )) 

295 self.response_cache[job.name] = job.execute_response 

296 

297 def queue_job(self, job_name): 

298 with self.session(sqlite_lock_immediately=True) as session: 

299 job = self._get_job(job_name, session, with_for_update=True) 

300 job.assigned = False 

301 

302 def update_job(self, job_name, changes): 

303 if "result" in changes: 

304 changes["result"] = digest_to_string(changes["result"]) 

305 if "action_digest" in changes: 

306 changes["action_digest"] = digest_to_string(changes["action_digest"]) 

307 

308 with self.session() as session: 

309 job = self._get_job(job_name, session) 

310 job.update(changes) 

311 if self.engine.dialect.name == "postgresql": 

312 conn = session.connection() 

313 conn.execute(f"NOTIFY job_updated, '{job_name}';") 

314 

315 def delete_job(self, job_name): 

316 if job_name in self.response_cache: 

317 del self.response_cache[job_name] 

318 

319 def wait_for_job_updates(self): 

320 self.__logger.info("Starting job watcher thread") 

321 if self.engine.dialect.name == "postgresql": 

322 self._listen_for_updates() 

323 else: 

324 self._poll_for_updates() 

325 

326 def _listen_for_updates(self): 

327 def _listen_loop(): 

328 try: 

329 conn = self.engine.connect() 

330 conn.execute(text("LISTEN job_updated;").execution_options(autocommit=True)) 

331 except Exception as e: 

332 raise DatabaseError("Could not start listening to DB for job updates") from e 

333 

334 while self.watcher_keep_running: 

335 # Wait until the connection is ready for reading. Timeout after 5 seconds 

336 # and try again if there was nothing to read. If the connection becomes 

337 # readable, collect the notifications it has received and handle them. 

338 # 

339 # See http://initd.org/psycopg/docs/advanced.html#async-notify 

340 if select.select([conn.connection], [], [], 5) == ([], [], []): 

341 pass 

342 else: 

343 

344 try: 

345 conn.connection.poll() 

346 except Exception as e: 

347 raise DatabaseError("Error while polling for job updates") from e 

348 

349 while conn.connection.notifies: 

350 notify = conn.connection.notifies.pop() 

351 with self.watched_jobs_lock: 

352 spec = self.watched_jobs.get(notify.payload) 

353 if spec is not None: 

354 new_job = self.get_job_by_name(notify.payload) 

355 new_state = JobState(new_job) 

356 if spec.last_state != new_state: 

357 spec.last_state = new_state 

358 spec.event.notify_change() 

359 

360 while self.watcher_keep_running: 

361 # Wait a few seconds if a database exception occurs and then try again 

362 # This could be a short disconnect 

363 try: 

364 _listen_loop() 

365 except DatabaseError as e: 

366 self.__logger.warning(f"JobWatcher encountered exception: [{e}];" 

367 f"Retrying in poll_interval=[{self.poll_interval}] seconds.") 

368 # Sleep for a bit so that we give enough time for the 

369 # database to potentially recover 

370 time.sleep(self.poll_interval) 

371 

372 def _get_watched_jobs(self): 

373 with self.session() as sess: 

374 jobs = sess.query(Job).filter( 

375 Job.name.in_(self.watched_jobs) 

376 ) 

377 return [job.to_internal_job(self) for job in jobs.all()] 

378 

379 def _poll_for_updates(self): 

380 def _poll_loop(): 

381 while self.watcher_keep_running: 

382 time.sleep(self.poll_interval) 

383 if self.watcher_keep_running: 

384 with self.watched_jobs_lock: 

385 if self.watcher_keep_running: 

386 try: 

387 watched_jobs = self._get_watched_jobs() 

388 except Exception as e: 

389 raise DatabaseError("Couldn't retrieve watched jobs from DB") from e 

390 

391 for new_job in watched_jobs: 

392 if self.watcher_keep_running: 

393 spec = self.watched_jobs[new_job.name] 

394 new_state = JobState(new_job) 

395 if spec.last_state != new_state: 

396 spec.last_state = new_state 

397 spec.event.notify_change() 

398 

399 while self.watcher_keep_running: 

400 # Wait a few seconds if a database exception occurs and then try again 

401 try: 

402 _poll_loop() 

403 except DatabaseError as e: 

404 self.__logger.warning(f"JobWatcher encountered exception: [{e}];" 

405 f"Retrying in poll_interval=[{self.poll_interval}] seconds.") 

406 # Sleep for a bit so that we give enough time for the 

407 # database to potentially recover 

408 time.sleep(self.poll_interval) 

409 

410 def store_response(self, job): 

411 digest = self.storage.put_message(job.execute_response) 

412 self.update_job(job.name, {"result": digest, "status_code": job.execute_response.status.code}) 

413 self.response_cache[job.name] = job.execute_response 

414 

415 def _get_operation(self, operation_name, session): 

416 operations = session.query(Operation).filter_by(name=operation_name) 

417 return operations.first() 

418 

419 def get_operations_by_stage(self, operation_stage): 

420 with self.session() as session: 

421 operations = session.query(Operation) 

422 operations = operations.filter(Operation.job.has(stage=operation_stage.value)) 

423 operations = operations.all() 

424 # Return a set of job names here for now, to match the `MemoryDataStore` 

425 # implementation's behaviour 

426 return set(op.job.name for op in operations) 

427 

428 def get_all_operations(self) -> List[operations_pb2.Operation]: 

429 with self.session() as session: 

430 operations = session.query(Operation) 

431 operations = operations.filter(~Operation.job.has(stage=OperationStage.COMPLETED.value)) # type: ignore 

432 return [operation.to_protobuf() for operation in operations] 

433 

434 def create_operation(self, operation, job_name): 

435 with self.session() as session: 

436 session.add(Operation( 

437 name=operation.name, 

438 job_name=job_name 

439 )) 

440 

441 def update_operation(self, operation_name, changes): 

442 with self.session() as session: 

443 operation = self._get_operation(operation_name, session) 

444 operation.update(changes) 

445 

446 def delete_operation(self, operation_name): 

447 # Don't do anything. This function needs to exist but there's no 

448 # need to actually delete operations in this implementation. 

449 pass 

450 

451 def get_leases_by_state(self, lease_state): 

452 with self.session() as session: 

453 leases = session.query(Lease).filter_by(state=lease_state.value) 

454 leases = leases.all() 

455 # `lease.job_name` is the same as `lease.id` for a Lease protobuf 

456 return set(lease.job_name for lease in leases) 

457 

458 def get_metrics(self): 

459 

460 def _get_query_leases_by_state(session, category): 

461 # Using func.count here to avoid generating a subquery in the WHERE 

462 # clause of the resulting query. 

463 # https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.count 

464 query = session.query(literal_column(category).label("category"), 

465 Lease.state.label("bucket"), 

466 func.count(Lease.id).label("value")) 

467 query = query.group_by(Lease.state) 

468 return query 

469 

470 def _cb_query_leases_by_state(leases_by_state): 

471 # The database only returns counts > 0, so fill in the gaps 

472 for state in LeaseState: 

473 if state.value not in leases_by_state: 

474 leases_by_state[state.value] = 0 

475 return leases_by_state 

476 

477 def _get_query_operations_by_stage(session, category): 

478 # Using func.count here to avoid generating a subquery in the WHERE 

479 # clause of the resulting query. 

480 # https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.count 

481 query = session.query(literal_column(category).label("category"), 

482 Job.stage.label("bucket"), 

483 func.count(Operation.name).label("value")) 

484 query = query.join(Job) 

485 query = query.group_by(Job.stage) 

486 return query 

487 

488 def _cb_query_operations_by_stage(operations_by_stage): 

489 # The database only returns counts > 0, so fill in the gaps 

490 for stage in OperationStage: 

491 if stage.value not in operations_by_stage: 

492 operations_by_stage[stage.value] = 0 

493 return operations_by_stage 

494 

495 def _get_query_jobs_by_stage(session, category): 

496 # Using func.count here to avoid generating a subquery in the WHERE 

497 # clause of the resulting query. 

498 # https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.count 

499 query = session.query(literal_column(category).label("category"), 

500 Job.stage.label("bucket"), 

501 func.count(Job.name).label("value")) 

502 query = query.group_by(Job.stage) 

503 return query 

504 

505 def _cb_query_jobs_by_stage(jobs_by_stage): 

506 # The database only returns counts > 0, so fill in the gaps 

507 for stage in OperationStage: 

508 if stage.value not in jobs_by_stage: 

509 jobs_by_stage[stage.value] = 0 

510 return jobs_by_stage 

511 

512 metrics = {} 

513 try: 

514 with self.session() as session: 

515 # metrics to gather: (category_name, function_returning_query, callback_function) 

516 metrics_to_gather = [(MetricCategories.LEASES.value, _get_query_leases_by_state, 

517 _cb_query_leases_by_state), 

518 (MetricCategories.OPERATIONS.value, _get_query_operations_by_stage, 

519 _cb_query_operations_by_stage), 

520 (MetricCategories.JOBS.value, _get_query_jobs_by_stage, 

521 _cb_query_jobs_by_stage)] 

522 

523 union_query = union(*[query_fn(session, f"'{category}'") 

524 for category, query_fn, _ in metrics_to_gather]) 

525 union_results = session.execute(union_query).fetchall() 

526 

527 grouped_results = {category: {} for category, _, _ in union_results} 

528 for category, bucket, value in union_results: 

529 grouped_results[category][bucket] = value 

530 

531 for category, _, category_cb in metrics_to_gather: 

532 metrics[category] = category_cb(grouped_results.setdefault(category, {})) 

533 except DatabaseError: 

534 self.__logger.warning("Unable to gather metrics due to a Database Error.") 

535 return {} 

536 

537 return metrics 

538 

539 def _create_lease(self, lease, session, job=None): 

540 if job is None: 

541 job = self._get_job(lease.id, session) 

542 job = job.to_internal_job(self) 

543 session.add(Lease( 

544 job_name=lease.id, 

545 state=lease.state, 

546 status=None, 

547 worker_name=job.worker_name 

548 )) 

549 

550 def create_lease(self, lease): 

551 with self.session() as session: 

552 self._create_lease(lease, session) 

553 

554 def update_lease(self, job_name, changes): 

555 with self.session() as session: 

556 job = self._get_job(job_name, session) 

557 lease = job.active_leases[0] 

558 lease.update(changes) 

559 

560 def load_unfinished_jobs(self): 

561 with self.session() as session: 

562 jobs = session.query(Job) 

563 jobs = jobs.filter(Job.stage != OperationStage.COMPLETED.value) 

564 jobs = jobs.order_by(Job.priority) 

565 return [j.to_internal_job(self) for j in jobs.all()] 

566 

567 def assign_lease_for_next_job(self, capabilities, callback, timeout=None): 

568 """Return a list of leases for the highest priority jobs that can be run by a worker. 

569 

570 NOTE: Currently the list only ever has one or zero leases. 

571 

572 Query the jobs table to find queued jobs which match the capabilities of 

573 a given worker, and return the one with the highest priority. Takes a 

574 dictionary of worker capabilities to compare with job requirements. 

575 

576 :param capabilities: Dictionary of worker capabilities to compare 

577 with job requirements when finding a job. 

578 :type capabilities: dict 

579 :param callback: Function to run on the next runnable job, should return 

580 a list of leases. 

581 :type callback: function 

582 :param timeout: time to wait for new jobs, caps if longer 

583 than MAX_JOB_BLOCK_TIME. 

584 :type timeout: int 

585 :returns: List of leases 

586 

587 """ 

588 if not timeout: 

589 return self._assign_job_leases(capabilities, callback) 

590 

591 # Cap the timeout if it's larger than MAX_JOB_BLOCK_TIME 

592 if timeout: 

593 timeout = min(timeout, MAX_JOB_BLOCK_TIME) 

594 

595 start = time.time() 

596 while time.time() + self.connection_timeout + 1 - start < timeout: 

597 leases = self._assign_job_leases(capabilities, callback) 

598 if leases: 

599 return leases 

600 time.sleep(0.5) 

601 if self.connection_timeout > timeout: 

602 self.__logger.warning( 

603 "Not providing any leases to the worker because the database connection " 

604 "timeout is longer than the remaining time to handle the request. " 

605 "Increase the worker's timeout to solve this problem.") 

606 return [] 

607 

608 def flatten_capabilities(self, capabilities: Dict[str, List[str]]) -> List[Tuple[str, str]]: 

609 """ Flatten a capabilities dictionary, assuming all of its values are lists. E.g. 

610 

611 {'OSFamily': ['Linux'], 'ISA': ['x86-32', 'x86-64']} 

612 

613 becomes 

614 

615 [('OSFamily', 'Linux'), ('ISA', 'x86-32'), ('ISA', 'x86-64')] """ 

616 return [ 

617 (name, value) for name, value_list in capabilities.items() 

618 for value in value_list 

619 ] 

620 

621 def get_partial_capabilities(self, capabilities: Dict[str, List[str]]) -> Iterable[Dict[str, List[str]]]: 

622 """ Given a capabilities dictionary with all values as lists, 

623 yield all partial capabilities dictionaries. """ 

624 CAPABILITIES_WARNING_THRESHOLD = 10 

625 

626 caps_flat = self.flatten_capabilities(capabilities) 

627 

628 if len(caps_flat) > CAPABILITIES_WARNING_THRESHOLD: 

629 self.__logger.warning( 

630 "A worker with a large capabilities dictionary has been connected. " 

631 f"Processing its capabilities may take a while. Capabilities: {capabilities}") 

632 

633 # Using the itertools powerset recipe, construct the powerset of the tuples 

634 capabilities_powerset = chain.from_iterable(combinations(caps_flat, r) for r in range(len(caps_flat) + 1)) 

635 for partial_capability_tuples in capabilities_powerset: 

636 partial_dict: Dict[str, List[str]] = {} 

637 

638 for tup in partial_capability_tuples: 

639 partial_dict.setdefault(tup[0], []).append(tup[1]) 

640 yield partial_dict 

641 

642 def get_partial_capabilities_hashes(self, capabilities: Dict) -> List[str]: 

643 """ Given a list of configurations, obtain each partial configuration 

644 for each configuration, obtain the hash of each partial configuration, 

645 compile these into a list, and return the result. """ 

646 # Convert requirements values to sorted lists to make them json-serializable 

647 convert_values_to_sorted_lists(capabilities) 

648 

649 # Check to see if we've cached this value 

650 capabilities_digest = hash_from_dict(capabilities) 

651 try: 

652 return self.capabilities_cache[capabilities_digest] 

653 except KeyError: 

654 # On cache miss, expand the capabilities into each possible partial capabilities dictionary 

655 capabilities_list = [] 

656 for partial_capability in self.get_partial_capabilities(capabilities): 

657 capabilities_list.append(hash_from_dict(partial_capability)) 

658 

659 self.capabilities_cache[capabilities_digest] = capabilities_list 

660 return capabilities_list 

661 

662 def _assign_job_leases(self, capabilities, callback): 

663 # Hash the capabilities 

664 capabilities_config_hashes = self.get_partial_capabilities_hashes(capabilities) 

665 try: 

666 with self.session(sqlite_lock_immediately=True) as session: 

667 jobs = session.query(Job).with_for_update(skip_locked=True) 

668 jobs = jobs.filter(Job.stage == OperationStage.QUEUED.value) 

669 jobs = jobs.filter(Job.assigned != True) # noqa 

670 jobs = jobs.filter(Job.platform_requirements.in_(capabilities_config_hashes)) 

671 job = jobs.order_by(Job.priority, Job.queued_timestamp).first() 

672 # This worker can take this job if it can handle all of its configurations 

673 if job: 

674 internal_job = job.to_internal_job(self) 

675 leases = callback(internal_job) 

676 if leases: 

677 job.assigned = True 

678 job.worker_start_timestamp = internal_job.worker_start_timestamp_as_datetime 

679 for lease in leases: 

680 self._create_lease(lease, session, job=internal_job) 

681 return leases 

682 

683 except DatabaseError: 

684 self.__logger.warning("Will not assign any leases this time due to a Database Error.") 

685 

686 return []