Coverage for /builds/BuildGrid/buildgrid/buildgrid/utils.py: 84.78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

230 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from contextlib import contextmanager 

17from itertools import combinations 

18from urllib.parse import urljoin 

19from operator import attrgetter 

20from typing import Dict, Iterable, List, Mapping, Sequence, Set, Tuple, TypeVar, Union 

21import hashlib 

22import json 

23import os 

24import socket 

25import threading 

26 

27from buildgrid.settings import HASH, HASH_LENGTH, BROWSER_URL_FORMAT, DEFAULT_LOCK_ACQUIRE_TIMEOUT 

28from buildgrid._enums import JobEventType 

29from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

30 

31 

32T = TypeVar('T') 

33 

34 

35secure_uri_schemes = ["https", "grpcs"] 

36insecure_uri_schemes = ["http", "grpc"] 

37 

38 

39class BrowserURL: 

40 

41 __url_markers = ( 

42 '%(instance)s', 

43 '%(type)s', 

44 '%(hash)s', 

45 '%(sizebytes)s', 

46 ) 

47 

48 def __init__(self, base_url, instance_name=None): 

49 """Begins browser URL helper initialization.""" 

50 self.__base_url = base_url 

51 self.__initialized = False 

52 self.__url_spec = { 

53 '%(instance)s': instance_name or '', 

54 } 

55 

56 def for_message(self, message_type, message_digest): 

57 """Completes browser URL initialization for a protobuf message.""" 

58 if self.__initialized: 

59 return False 

60 

61 self.__url_spec['%(type)s'] = message_type 

62 self.__url_spec['%(hash)s'] = message_digest.hash 

63 self.__url_spec['%(sizebytes)s'] = str(message_digest.size_bytes) 

64 

65 self.__initialized = True 

66 return True 

67 

68 def generate(self): 

69 """Generates a browser URL string.""" 

70 if not self.__base_url or not self.__initialized: 

71 return None 

72 

73 url_tail = BROWSER_URL_FORMAT 

74 

75 for url_marker in self.__url_markers: 

76 if url_marker not in self.__url_spec: 

77 return None 

78 if url_marker not in url_tail: 

79 continue 

80 url_tail = url_tail.replace(url_marker, self.__url_spec[url_marker]) 

81 

82 return urljoin(self.__base_url, url_tail) 

83 

84 

85class Condition(threading.Condition): 

86 

87 """Subclass of ``threading.Condition`` with ``wait`` overridden. 

88 

89 In this implementation, ``wait`` only releases the lock if other 

90 threads are actually waiting for the lock, otherwise it does nothing. 

91 

92 """ 

93 

94 def __init__(self, lock=None): 

95 super().__init__(lock=lock) 

96 self.thread_count = 0 

97 

98 def __enter__(self): 

99 self.thread_count += 1 

100 return super().__enter__() 

101 

102 def __exit__(self, *args): 

103 self.thread_count -= 1 

104 self.notify() 

105 return super().__exit__(*args) 

106 

107 def wait(self, timeout=None): 

108 """Wait if other threads are trying to acquire the lock. 

109 

110 If other threads have attempted to acquire the lock for this Condition 

111 using ``with``, this method behaves the same as ``wait`` on a regular 

112 ``threading.Condition``. 

113 

114 If only one thread has attempted to acquire the lock, then that thread 

115 must be the current thread. In that case, this method doesn't release 

116 the lock or wait at all, it simply returns ``True`` as if it had been 

117 woken by ``threading.Condition.notify``. 

118 

119 """ 

120 if not super()._is_owned: 

121 raise RuntimeError("cannot wait on un-acquired lock") 

122 

123 if self.thread_count > 1: 

124 return super().wait(timeout) 

125 return True 

126 

127 

128class TypedEvent: 

129 

130 """Wrapper around a ``threading.Event`` to support event 'types'""" 

131 

132 def __init__(self): 

133 self.event = threading.Event() 

134 self.history = [] 

135 

136 def set(self, event_type=None): 

137 self.history.append(event_type) 

138 self.event.set() 

139 

140 def clear(self): 

141 self.event.clear() 

142 

143 def notify_change(self): 

144 self.set(event_type=JobEventType.CHANGE) 

145 self.clear() 

146 

147 def notify_stop(self): 

148 self.set(event_type=JobEventType.STOP) 

149 self.clear() 

150 

151 def wait(self, last_received=None): 

152 if last_received is not None: 

153 next_index = last_received + 1 

154 if next_index < len(self.history): 

155 return next_index, self.history[next_index] 

156 self.event.wait() 

157 return len(self.history) - 1, self.history[-1] 

158 

159 

160class JobState: 

161 

162 def __init__(self, job): 

163 self.cancelled = job.cancelled 

164 self.operation_stage = job.operation_stage 

165 

166 def __eq__(self, other): 

167 return (self.cancelled == other.cancelled and 

168 self.operation_stage == other.operation_stage) 

169 

170 

171class JobWatchSpec: 

172 

173 """Structure to track what operations are being watched for a given job. 

174 

175 This also contains the event used for notifying watchers of changes, and the 

176 state that the job was in after a change was last detected. 

177 

178 """ 

179 

180 def __init__(self, job): 

181 """Instantiate a new JobWatchSpec. 

182 

183 Args: 

184 job (buildgrid.server.job.Job): The job that this spec tracks the 

185 watchers and state for. 

186 

187 """ 

188 self.event = TypedEvent() 

189 self.last_state = JobState(job) 

190 self.operations = {} 

191 self.operations_lock = threading.Lock() 

192 

193 @property 

194 def peers(self): 

195 with self.operations_lock: 

196 return [peer for op in self.operations.values() 

197 for peer in op["peers"]] 

198 

199 def peers_for_operation(self, operation_name): 

200 """Returns a copy of the list of peers for the given operation. 

201 

202 If the operation is not being watched, or for some reason has no "peers" 

203 key, the empty list is returned. 

204 

205 Args: 

206 operation_name (string): The name of the operation to get the list 

207 of peers for. 

208 

209 """ 

210 try: 

211 return self.operations[operation_name]["peers"].copy() 

212 except KeyError: 

213 return [] 

214 

215 def add_peer(self, operation_name, peer): 

216 """Add a peer to the set of peers watching the job this spec is for. 

217 

218 Takes an operation name and a peer and tracks that the peer is watching 

219 the given operation. 

220 

221 Args: 

222 operation_name (string): The name of the operation that the peer 

223 is watching for updates on. 

224 peer (string): The peer that is starting to watch for updates. 

225 

226 """ 

227 with self.operations_lock: 

228 if operation_name in self.operations: 

229 self.operations[operation_name]["peers"].append(peer) 

230 else: 

231 self.operations[operation_name] = { 

232 "peers": [peer] 

233 } 

234 

235 def remove_peer(self, operation_name, peer): 

236 """Remove a peer from the list watching an operation for this job. 

237 

238 The inverse of ``add_peer``. Takes an operation name and a peer and 

239 removes that peer from the list of peers watching that operation. 

240 If this leaves the operation with no peers watching it, the operation 

241 is removed from the ``JobWatchSpec``. 

242 

243 Args: 

244 operation_name (string): The name of the operation that is 

245 no longer being watched by the peer. 

246 peer (string): The name of the peer that is stopping watching. 

247 

248 """ 

249 with self.operations_lock: 

250 if operation_name in self.operations: 

251 self.operations[operation_name]["peers"].remove(peer) 

252 if not self.operations[operation_name]["peers"]: 

253 self.operations.pop(operation_name) 

254 

255 

256@contextmanager 

257def acquire_lock_or_timeout(lock, timeout=DEFAULT_LOCK_ACQUIRE_TIMEOUT): 

258 result = lock.acquire(timeout=timeout) 

259 if result: 

260 try: 

261 yield result 

262 finally: 

263 lock.release() 

264 else: 

265 raise TimeoutError(f'Could not acquire lock with timeout=[{timeout}]') 

266 

267 

268def get_hostname(): 

269 """Returns the hostname of the machine executing that function. 

270 

271 Returns: 

272 str: Hostname for the current machine. 

273 """ 

274 return socket.gethostname() 

275 

276 

277def get_hash_type(): 

278 """Returns the hash type.""" 

279 hash_name = HASH().name 

280 if hash_name == "sha256": 

281 return remote_execution_pb2.DigestFunction.SHA256 

282 return remote_execution_pb2.DigestFunction.UNKNOWN 

283 

284 

285def create_digest(bytes_to_digest): 

286 """Computes the :obj:`Digest` of a piece of data. 

287 

288 The :obj:`Digest` of a data is a function of its hash **and** size. 

289 

290 Args: 

291 bytes_to_digest (bytes): byte data to digest. 

292 

293 Returns: 

294 :obj:`Digest`: The :obj:`Digest` for the given byte data. 

295 """ 

296 return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(), 

297 size_bytes=len(bytes_to_digest)) 

298 

299 

300def parse_digest(digest_string): 

301 """Creates a :obj:`Digest` from a digest string. 

302 

303 A digest string should alway be: ``{hash}/{size_bytes}``. 

304 

305 Args: 

306 digest_string (str): the digest string. 

307 

308 Returns: 

309 :obj:`Digest`: The :obj:`Digest` read from the string or None if 

310 `digest_string` is not a valid digest string. 

311 """ 

312 digest_hash, digest_size = digest_string.split('/') 

313 

314 if len(digest_hash) == HASH_LENGTH and digest_size.isdigit(): 

315 return remote_execution_pb2.Digest(hash=digest_hash, 

316 size_bytes=int(digest_size)) 

317 

318 return None 

319 

320 

321def read_file(file_path): 

322 """Loads raw file content in memory. 

323 

324 Args: 

325 file_path (str): path to the target file. 

326 

327 Returns: 

328 bytes: Raw file's content until EOF. 

329 

330 Raises: 

331 OSError: If `file_path` does not exist or is not readable. 

332 """ 

333 with open(file_path, 'rb') as byte_file: 

334 return byte_file.read() 

335 

336 

337def write_file(file_path, content): 

338 """Dumps raw memory content to a file. 

339 

340 Args: 

341 file_path (str): path to the target file. 

342 content (bytes): raw file's content. 

343 

344 Raises: 

345 OSError: If `file_path` does not exist or is not writable. 

346 """ 

347 with open(file_path, 'wb') as byte_file: 

348 byte_file.write(content) 

349 byte_file.flush() 

350 

351 

352def merkle_tree_maker(directory_path): 

353 """Walks a local folder tree, generating :obj:`FileNode` and 

354 :obj:`DirectoryNode`. 

355 

356 Args: 

357 directory_path (str): absolute or relative path to a local directory. 

358 

359 Yields: 

360 :obj:`Message`, bytes, str: a tutple of either a :obj:`FileNode` or 

361 :obj:`DirectoryNode` message, the corresponding blob and the 

362 corresponding node path. 

363 """ 

364 directory_name = os.path.basename(directory_path) 

365 

366 # Actual generator, yields recursively FileNodes and DirectoryNodes: 

367 def __merkle_tree_maker(directory_path, directory_name): 

368 if not os.path.isabs(directory_path): 

369 directory_path = os.path.abspath(directory_path) 

370 

371 directory = remote_execution_pb2.Directory() 

372 

373 files, directories, symlinks = [], [], [] 

374 for directory_entry in os.scandir(directory_path): 

375 node_name, node_path = directory_entry.name, directory_entry.path 

376 

377 if directory_entry.is_file(follow_symlinks=False): 

378 node_blob = read_file(directory_entry.path) 

379 node_digest = create_digest(node_blob) 

380 

381 node = remote_execution_pb2.FileNode() 

382 node.name = node_name 

383 node.digest.CopyFrom(node_digest) 

384 node.is_executable = os.access(node_path, os.X_OK) 

385 

386 files.append(node) 

387 

388 yield node, node_blob, node_path 

389 

390 elif directory_entry.is_dir(follow_symlinks=False): 

391 node, node_blob, _ = yield from __merkle_tree_maker(node_path, node_name) 

392 

393 directories.append(node) 

394 

395 yield node, node_blob, node_path 

396 

397 # Create a SymlinkNode; 

398 elif os.path.islink(directory_entry.path): 

399 node_target = os.readlink(directory_entry.path) 

400 

401 node = remote_execution_pb2.SymlinkNode() 

402 node.name = directory_entry.name 

403 node.target = node_target 

404 

405 symlinks.append(node) 

406 

407 files.sort(key=attrgetter('name')) 

408 directories.sort(key=attrgetter('name')) 

409 symlinks.sort(key=attrgetter('name')) 

410 

411 directory.files.extend(files) 

412 directory.directories.extend(directories) 

413 directory.symlinks.extend(symlinks) 

414 

415 node_blob = directory.SerializeToString() 

416 node_digest = create_digest(node_blob) 

417 

418 node = remote_execution_pb2.DirectoryNode() 

419 node.name = directory_name 

420 node.digest.CopyFrom(node_digest) 

421 

422 return node, node_blob, directory_path 

423 

424 node, node_blob, node_path = yield from __merkle_tree_maker(directory_path, 

425 directory_name) 

426 

427 yield node, node_blob, node_path 

428 

429 

430def output_file_maker(file_path, input_path, file_digest): 

431 """Creates an :obj:`OutputFile` from a local file and possibly upload it. 

432 

433 Note: 

434 `file_path` **must** point inside or be relative to `input_path`. 

435 

436 Args: 

437 file_path (str): absolute or relative path to a local file. 

438 input_path (str): absolute or relative path to the input root directory. 

439 file_digest (:obj:`Digest`): the underlying file's digest. 

440 

441 Returns: 

442 :obj:`OutputFile`: a new :obj:`OutputFile` object for the file pointed 

443 by `file_path`. 

444 """ 

445 if not os.path.isabs(file_path): 

446 file_path = os.path.abspath(file_path) 

447 if not os.path.isabs(input_path): 

448 input_path = os.path.abspath(input_path) 

449 

450 output_file = remote_execution_pb2.OutputFile() 

451 output_file.digest.CopyFrom(file_digest) 

452 # OutputFile.path should be relative to the working directory 

453 output_file.path = os.path.relpath(file_path, start=input_path) 

454 output_file.is_executable = os.access(file_path, os.X_OK) 

455 

456 return output_file 

457 

458 

459def output_directory_maker(directory_path, working_path, tree_digest): 

460 """Creates an :obj:`OutputDirectory` from a local directory. 

461 

462 Note: 

463 `directory_path` **must** point inside or be relative to `input_path`. 

464 

465 Args: 

466 directory_path (str): absolute or relative path to a local directory. 

467 working_path (str): absolute or relative path to the working directory. 

468 tree_digest (:obj:`Digest`): the underlying folder tree's digest. 

469 

470 Returns: 

471 :obj:`OutputDirectory`: a new :obj:`OutputDirectory` for the directory 

472 pointed by `directory_path`. 

473 """ 

474 if not os.path.isabs(directory_path): 

475 directory_path = os.path.abspath(directory_path) 

476 if not os.path.isabs(working_path): 

477 working_path = os.path.abspath(working_path) 

478 

479 output_directory = remote_execution_pb2.OutputDirectory() 

480 output_directory.tree_digest.CopyFrom(tree_digest) 

481 output_directory.path = os.path.relpath(directory_path, start=working_path) 

482 

483 return output_directory 

484 

485 

486def convert_values_to_sorted_lists(dictionary: Dict[str, Union[str, Sequence[str]]]) -> None: 

487 """ Given a dictionary, do the following: 

488 

489 1. Turn strings into singleton lists 

490 2. Turn all other sequence types into sorted lists with list() 

491 

492 This is a mutating operation. No value is returned. 

493 

494 """ 

495 for k in dictionary: 

496 value = dictionary[k] 

497 if isinstance(value, str): 

498 dictionary[k] = [value] 

499 else: 

500 try: 

501 dictionary[k] = sorted(list(value)) 

502 except TypeError: 

503 raise ValueError(f"{value} cannot be sorted") 

504 

505 

506def hash_from_dict(dictionary: Dict[str, List[str]]) -> str: 

507 """ Get the hash represntation of a dictionary """ 

508 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest() 

509 

510 

511def get_unique_objects_by_attribute(objects: Sequence[T], attribute: str) -> Iterable[T]: 

512 """ Return a list of unique objects based on a hashable attribute or chained attributes. 

513 

514 Note that this does not provide any sanitization, and any problematic elements will 

515 only raise exceptions when iterated on. """ 

516 

517 attrs_seen = set() 

518 

519 for obj in objects: 

520 if obj: 

521 attr_value = attrgetter(attribute)(obj) 

522 if attr_value not in attrs_seen: 

523 attrs_seen.add(attr_value) 

524 yield obj 

525 

526 

527def retry_delay(retry_attempt: int, delay_base: int=1) -> float: 

528 attempt = min(5, retry_attempt) # Limit the delay to ~10.5x the base time 

529 return round(delay_base * (1.6 ** attempt), 1) 

530 

531 

532def flatten_capabilities(capabilities: Mapping[str, Set[str]]) -> List[Tuple[str, str]]: 

533 """Flatten a capabilities dictionary. 

534 

535 This method takes a capabilities dictionary and flattens it into a 

536 list of key/value tuples describing all the platform properties 

537 that the capabilities map to. To do this, it assumes that all of the 

538 dictionary's values are iterable. 

539 

540 For example, 

541 

542 ``{'OSFamily': {'Linux'}, 'ISA': {'x86-32', 'x86-64'}}`` 

543 

544 becomes 

545 

546 ``[('OSFamily', 'Linux'), ('ISA', 'x86-32'), ('ISA', 'x86-64')]`` 

547 

548 Args: 

549 capabilities (dict): The capabilities dictionary to flatten. 

550 

551 Returns: 

552 list containing the flattened dictionary key-value tuples. 

553 

554 """ 

555 return [ 

556 (name, value) for name, value_list in capabilities.items() 

557 for value in value_list 

558 ] 

559 

560 

561def combinations_with_unique_keys(iterator: Sequence[Tuple[str, str]], size: int) -> Iterable[Iterable[Tuple]]: 

562 """Return an iterator of the unique combinations of the input without duplicated keys. 

563 

564 The input ``iterator`` is a sequence of key-value tuples. This function behaves 

565 similarly to :func:`itertools.combinations`, except combinations containing 

566 more than one tuple with the same first element are not included in the result. 

567 

568 The ``size`` argument specifies how many elements should be included in the 

569 resulting combinations. 

570 

571 For example, 

572 

573 .. code-block:: python 

574 

575 >>> capabilities = [('OSFamily', 'linux'), ('ISA', 'x86-64'), ('ISA', 'x86-32')] 

576 >>> platforms = combinations_with_unique_keys(capabilities, 2) 

577 >>> for item in platforms: 

578 ... print(item) 

579 ... 

580 (('OSFamily', 'linux'), ('ISA', 'x86-64')) 

581 (('OSFamily', 'linux'), ('ISA', 'x86-32')) 

582 

583 Args: 

584 iterator (list): The list of key-value tuples to return combinations of. 

585 size (int): How many elements to include in each combination. 

586 

587 Returns: 

588 An iterator of the combinations of the input in which each key appears 

589 at most once. 

590 

591 """ 

592 def _validate_combination(combination): 

593 seen = set() 

594 return not any(key in seen or seen.add(key) for key, _ in combination) 

595 yield from filter(_validate_combination, combinations(iterator, size))