Coverage for /builds/BuildGrid/buildgrid/buildgrid/utils.py: 84.62%

221 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from contextlib import contextmanager 

17from itertools import combinations 

18from urllib.parse import urljoin 

19from operator import attrgetter 

20from typing import AnyStr, Dict, Iterable, IO, List, Mapping, Optional, Sequence, Set, Tuple, TypeVar, Union 

21import hashlib 

22import json 

23import os 

24import socket 

25import threading 

26 

27from buildgrid.settings import HASH, HASH_LENGTH, BROWSER_URL_FORMAT, DEFAULT_LOCK_ACQUIRE_TIMEOUT 

28from buildgrid._enums import JobEventType 

29from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

30 

31 

32T = TypeVar('T') 

33 

34 

35secure_uri_schemes = ["https", "grpcs"] 

36insecure_uri_schemes = ["http", "grpc"] 

37 

38 

39class BrowserURL: 

40 

41 __url_markers = ( 

42 '%(instance)s', 

43 '%(type)s', 

44 '%(hash)s', 

45 '%(sizebytes)s', 

46 ) 

47 

48 def __init__(self, base_url, instance_name=None): 

49 """Begins browser URL helper initialization.""" 

50 self.__base_url = base_url 

51 self.__initialized = False 

52 self.__url_spec = { 

53 '%(instance)s': instance_name or '', 

54 } 

55 

56 def for_message(self, message_type, message_digest): 

57 """Completes browser URL initialization for a protobuf message.""" 

58 if self.__initialized: 

59 return False 

60 

61 self.__url_spec['%(type)s'] = message_type 

62 self.__url_spec['%(hash)s'] = message_digest.hash 

63 self.__url_spec['%(sizebytes)s'] = str(message_digest.size_bytes) 

64 

65 self.__initialized = True 

66 return True 

67 

68 def generate(self): 

69 """Generates a browser URL string.""" 

70 if not self.__base_url or not self.__initialized: 

71 return None 

72 

73 url_tail = BROWSER_URL_FORMAT 

74 

75 for url_marker in self.__url_markers: 

76 if url_marker not in self.__url_spec: 

77 return None 

78 if url_marker not in url_tail: 

79 continue 

80 url_tail = url_tail.replace(url_marker, self.__url_spec[url_marker]) 

81 

82 return urljoin(self.__base_url, url_tail) 

83 

84 

85class TypedEvent: 

86 

87 """Wrapper around a ``threading.Event`` to support event 'types'""" 

88 

89 def __init__(self): 

90 self.event = threading.Event() 

91 self.history = [] 

92 

93 def set(self, event_type=None): 

94 self.history.append(event_type) 

95 self.event.set() 

96 

97 def clear(self): 

98 self.event.clear() 

99 

100 def notify_change(self): 

101 self.set(event_type=JobEventType.CHANGE) 

102 self.clear() 

103 

104 def notify_stop(self): 

105 self.set(event_type=JobEventType.STOP) 

106 self.clear() 

107 

108 def wait(self, last_received=None): 

109 if last_received is not None: 

110 next_index = last_received + 1 

111 if next_index < len(self.history): 

112 return next_index, self.history[next_index] 

113 self.event.wait() 

114 return len(self.history) - 1, self.history[-1] 

115 

116 

117class JobState: 

118 

119 def __init__(self, job): 

120 self.cancelled = job.cancelled 

121 self.operation_stage = job.operation_stage 

122 

123 def __eq__(self, other): 

124 return (self.cancelled == other.cancelled and 

125 self.operation_stage == other.operation_stage) 

126 

127 

128class JobWatchSpec: 

129 

130 """Structure to track what operations are being watched for a given job. 

131 

132 This also contains the event used for notifying watchers of changes, and the 

133 state that the job was in after a change was last detected. 

134 

135 """ 

136 

137 def __init__(self, job): 

138 """Instantiate a new JobWatchSpec. 

139 

140 Args: 

141 job (buildgrid.server.job.Job): The job that this spec tracks the 

142 watchers and state for. 

143 

144 """ 

145 self.event = TypedEvent() 

146 self.last_state = JobState(job) 

147 self.operations = {} 

148 self.operations_lock = threading.Lock() 

149 

150 @property 

151 def peers(self): 

152 with self.operations_lock: 

153 return [peer for op in self.operations.values() 

154 for peer in op["peers"]] 

155 

156 def peers_for_operation(self, operation_name): 

157 """Returns a copy of the list of peers for the given operation. 

158 

159 If the operation is not being watched, or for some reason has no "peers" 

160 key, the empty list is returned. 

161 

162 Args: 

163 operation_name (string): The name of the operation to get the list 

164 of peers for. 

165 

166 """ 

167 try: 

168 return self.operations[operation_name]["peers"].copy() 

169 except KeyError: 

170 return [] 

171 

172 def add_peer(self, operation_name, peer): 

173 """Add a peer to the set of peers watching the job this spec is for. 

174 

175 Takes an operation name and a peer and tracks that the peer is watching 

176 the given operation. 

177 

178 Args: 

179 operation_name (string): The name of the operation that the peer 

180 is watching for updates on. 

181 peer (string): The peer that is starting to watch for updates. 

182 

183 """ 

184 with self.operations_lock: 

185 if operation_name in self.operations: 

186 self.operations[operation_name]["peers"].append(peer) 

187 else: 

188 self.operations[operation_name] = { 

189 "peers": [peer] 

190 } 

191 

192 def remove_peer(self, operation_name, peer): 

193 """Remove a peer from the list watching an operation for this job. 

194 

195 The inverse of ``add_peer``. Takes an operation name and a peer and 

196 removes that peer from the list of peers watching that operation. 

197 If this leaves the operation with no peers watching it, the operation 

198 is removed from the ``JobWatchSpec``. 

199 

200 Args: 

201 operation_name (string): The name of the operation that is 

202 no longer being watched by the peer. 

203 peer (string): The name of the peer that is stopping watching. 

204 

205 """ 

206 with self.operations_lock: 

207 if operation_name in self.operations: 

208 self.operations[operation_name]["peers"].remove(peer) 

209 if not self.operations[operation_name]["peers"]: 

210 self.operations.pop(operation_name) 

211 

212 

213@contextmanager 

214def acquire_lock_or_timeout(lock, timeout=DEFAULT_LOCK_ACQUIRE_TIMEOUT): 

215 result = lock.acquire(timeout=timeout) 

216 if result: 

217 try: 

218 yield result 

219 finally: 

220 lock.release() 

221 else: 

222 raise TimeoutError(f'Could not acquire lock with timeout=[{timeout}]') 

223 

224 

225def get_hostname(): 

226 """Returns the hostname of the machine executing that function. 

227 

228 Returns: 

229 str: Hostname for the current machine. 

230 """ 

231 return socket.gethostname() 

232 

233 

234def get_hash_type(): 

235 """Returns the hash type.""" 

236 hash_name = HASH().name 

237 if hash_name == "sha256": 

238 return remote_execution_pb2.DigestFunction.SHA256 

239 return remote_execution_pb2.DigestFunction.UNKNOWN 

240 

241 

242def create_digest(bytes_to_digest): 

243 """Computes the :obj:`Digest` of a piece of data. 

244 

245 The :obj:`Digest` of a data is a function of its hash **and** size. 

246 

247 Args: 

248 bytes_to_digest (bytes): byte data to digest. 

249 

250 Returns: 

251 :obj:`Digest`: The :obj:`Digest` for the given byte data. 

252 """ 

253 return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(), 

254 size_bytes=len(bytes_to_digest)) 

255 

256 

257def parse_digest(digest_string): 

258 """Creates a :obj:`Digest` from a digest string. 

259 

260 A digest string should alway be: ``{hash}/{size_bytes}``. 

261 

262 Args: 

263 digest_string (str): the digest string. 

264 

265 Returns: 

266 :obj:`Digest`: The :obj:`Digest` read from the string or None if 

267 `digest_string` is not a valid digest string. 

268 """ 

269 digest_hash, digest_size = digest_string.split('/') 

270 

271 if len(digest_hash) == HASH_LENGTH and digest_size.isdigit(): 

272 return remote_execution_pb2.Digest(hash=digest_hash, 

273 size_bytes=int(digest_size)) 

274 

275 return None 

276 

277 

278def validate_digest_data(digest: remote_execution_pb2.Digest, data: bytes): 

279 """ Validate that the given digest corresponds to the given data. """ 

280 return len(data) == digest.size_bytes and HASH(data).hexdigest() == digest.hash 

281 

282 

283def read_file(file_path): 

284 """Loads raw file content in memory. 

285 

286 Args: 

287 file_path (str): path to the target file. 

288 

289 Returns: 

290 bytes: Raw file's content until EOF. 

291 

292 Raises: 

293 OSError: If `file_path` does not exist or is not readable. 

294 """ 

295 with open(file_path, 'rb') as byte_file: 

296 return byte_file.read() 

297 

298 

299def write_file(file_path, content): 

300 """Dumps raw memory content to a file. 

301 

302 Args: 

303 file_path (str): path to the target file. 

304 content (bytes): raw file's content. 

305 

306 Raises: 

307 OSError: If `file_path` does not exist or is not writable. 

308 """ 

309 with open(file_path, 'wb') as byte_file: 

310 byte_file.write(content) 

311 byte_file.flush() 

312 

313 

314def read_and_rewind(read_head: IO) -> Optional[AnyStr]: 

315 """Reads from an IO object and returns the data found there 

316 after rewinding the object to the beginning. 

317 

318 Args: 

319 read_head (IO): readable IO head 

320 

321 Returns: 

322 AnyStr: readable content from `read_head`. 

323 """ 

324 if not read_head: 

325 return None 

326 

327 data = read_head.read() 

328 read_head.seek(0) 

329 return data 

330 

331 

332def merkle_tree_maker(directory_path): 

333 """Walks a local folder tree, generating :obj:`FileNode` and 

334 :obj:`DirectoryNode`. 

335 

336 Args: 

337 directory_path (str): absolute or relative path to a local directory. 

338 

339 Yields: 

340 :obj:`Message`, bytes, str: a tutple of either a :obj:`FileNode` or 

341 :obj:`DirectoryNode` message, the corresponding blob and the 

342 corresponding node path. 

343 """ 

344 directory_name = os.path.basename(directory_path) 

345 

346 # Actual generator, yields recursively FileNodes and DirectoryNodes: 

347 def __merkle_tree_maker(directory_path, directory_name): 

348 if not os.path.isabs(directory_path): 

349 directory_path = os.path.abspath(directory_path) 

350 

351 directory = remote_execution_pb2.Directory() 

352 

353 files, directories, symlinks = [], [], [] 

354 for directory_entry in os.scandir(directory_path): 

355 node_name, node_path = directory_entry.name, directory_entry.path 

356 

357 if directory_entry.is_file(follow_symlinks=False): 

358 node_blob = read_file(directory_entry.path) 

359 node_digest = create_digest(node_blob) 

360 

361 node = remote_execution_pb2.FileNode() 

362 node.name = node_name 

363 node.digest.CopyFrom(node_digest) 

364 node.is_executable = os.access(node_path, os.X_OK) 

365 

366 files.append(node) 

367 

368 yield node, node_blob, node_path 

369 

370 elif directory_entry.is_dir(follow_symlinks=False): 

371 node, node_blob, _ = yield from __merkle_tree_maker(node_path, node_name) 

372 

373 directories.append(node) 

374 

375 yield node, node_blob, node_path 

376 

377 # Create a SymlinkNode; 

378 elif os.path.islink(directory_entry.path): 

379 node_target = os.readlink(directory_entry.path) 

380 

381 node = remote_execution_pb2.SymlinkNode() 

382 node.name = directory_entry.name 

383 node.target = node_target 

384 

385 symlinks.append(node) 

386 

387 files.sort(key=attrgetter('name')) 

388 directories.sort(key=attrgetter('name')) 

389 symlinks.sort(key=attrgetter('name')) 

390 

391 directory.files.extend(files) 

392 directory.directories.extend(directories) 

393 directory.symlinks.extend(symlinks) 

394 

395 node_blob = directory.SerializeToString() 

396 node_digest = create_digest(node_blob) 

397 

398 node = remote_execution_pb2.DirectoryNode() 

399 node.name = directory_name 

400 node.digest.CopyFrom(node_digest) 

401 

402 return node, node_blob, directory_path 

403 

404 node, node_blob, node_path = yield from __merkle_tree_maker(directory_path, 

405 directory_name) 

406 

407 yield node, node_blob, node_path 

408 

409 

410def output_file_maker(file_path, input_path, file_digest): 

411 """Creates an :obj:`OutputFile` from a local file and possibly upload it. 

412 

413 Note: 

414 `file_path` **must** point inside or be relative to `input_path`. 

415 

416 Args: 

417 file_path (str): absolute or relative path to a local file. 

418 input_path (str): absolute or relative path to the input root directory. 

419 file_digest (:obj:`Digest`): the underlying file's digest. 

420 

421 Returns: 

422 :obj:`OutputFile`: a new :obj:`OutputFile` object for the file pointed 

423 by `file_path`. 

424 """ 

425 if not os.path.isabs(file_path): 

426 file_path = os.path.abspath(file_path) 

427 if not os.path.isabs(input_path): 

428 input_path = os.path.abspath(input_path) 

429 

430 output_file = remote_execution_pb2.OutputFile() 

431 output_file.digest.CopyFrom(file_digest) 

432 # OutputFile.path should be relative to the working directory 

433 output_file.path = os.path.relpath(file_path, start=input_path) 

434 output_file.is_executable = os.access(file_path, os.X_OK) 

435 

436 return output_file 

437 

438 

439def output_directory_maker(directory_path, working_path, tree_digest): 

440 """Creates an :obj:`OutputDirectory` from a local directory. 

441 

442 Note: 

443 `directory_path` **must** point inside or be relative to `input_path`. 

444 

445 Args: 

446 directory_path (str): absolute or relative path to a local directory. 

447 working_path (str): absolute or relative path to the working directory. 

448 tree_digest (:obj:`Digest`): the underlying folder tree's digest. 

449 

450 Returns: 

451 :obj:`OutputDirectory`: a new :obj:`OutputDirectory` for the directory 

452 pointed by `directory_path`. 

453 """ 

454 if not os.path.isabs(directory_path): 

455 directory_path = os.path.abspath(directory_path) 

456 if not os.path.isabs(working_path): 

457 working_path = os.path.abspath(working_path) 

458 

459 output_directory = remote_execution_pb2.OutputDirectory() 

460 output_directory.tree_digest.CopyFrom(tree_digest) 

461 output_directory.path = os.path.relpath(directory_path, start=working_path) 

462 

463 return output_directory 

464 

465 

466def convert_values_to_sorted_lists(dictionary: Dict[str, Union[str, Sequence[str]]]) -> None: 

467 """ Given a dictionary, do the following: 

468 

469 1. Turn strings into singleton lists 

470 2. Turn all other sequence types into sorted lists with list() 

471 

472 This is a mutating operation. No value is returned. 

473 

474 """ 

475 for k in dictionary: 

476 value = dictionary[k] 

477 if isinstance(value, str): 

478 dictionary[k] = [value] 

479 else: 

480 try: 

481 dictionary[k] = sorted(list(value)) 

482 except TypeError: 

483 raise ValueError(f"{value} cannot be sorted") 

484 

485 

486def hash_from_dict(dictionary: Dict[str, List[str]]) -> str: 

487 """ Get the hash represntation of a dictionary """ 

488 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest() 

489 

490 

491def get_unique_objects_by_attribute(objects: Sequence[T], attribute: str) -> Iterable[T]: 

492 """ Return a list of unique objects based on a hashable attribute or chained attributes. 

493 

494 Note that this does not provide any sanitization, and any problematic elements will 

495 only raise exceptions when iterated on. """ 

496 

497 attrs_seen = set() 

498 

499 for obj in objects: 

500 if obj: 

501 attr_value = attrgetter(attribute)(obj) 

502 if attr_value not in attrs_seen: 

503 attrs_seen.add(attr_value) 

504 yield obj 

505 

506 

507def retry_delay(retry_attempt: int, delay_base: int=1) -> float: 

508 attempt = min(5, retry_attempt) # Limit the delay to ~10.5x the base time 

509 return round(delay_base * (1.6 ** attempt), 1) 

510 

511 

512def flatten_capabilities(capabilities: Mapping[str, Set[str]]) -> List[Tuple[str, str]]: 

513 """Flatten a capabilities dictionary. 

514 

515 This method takes a capabilities dictionary and flattens it into a 

516 list of key/value tuples describing all the platform properties 

517 that the capabilities map to. To do this, it assumes that all of the 

518 dictionary's values are iterable. 

519 

520 For example, 

521 

522 ``{'OSFamily': {'Linux'}, 'ISA': {'x86-32', 'x86-64'}}`` 

523 

524 becomes 

525 

526 ``[('OSFamily', 'Linux'), ('ISA', 'x86-32'), ('ISA', 'x86-64')]`` 

527 

528 Args: 

529 capabilities (dict): The capabilities dictionary to flatten. 

530 

531 Returns: 

532 list containing the flattened dictionary key-value tuples. 

533 

534 """ 

535 return [ 

536 (name, value) for name, value_list in capabilities.items() 

537 for value in value_list 

538 ] 

539 

540 

541def combinations_with_unique_keys(iterator: Sequence[Tuple[str, str]], size: int) -> Iterable[Iterable[Tuple]]: 

542 """Return an iterator of the unique combinations of the input without duplicated keys. 

543 

544 The input ``iterator`` is a sequence of key-value tuples. This function behaves 

545 similarly to :func:`itertools.combinations`, except combinations containing 

546 more than one tuple with the same first element are not included in the result. 

547 

548 The ``size`` argument specifies how many elements should be included in the 

549 resulting combinations. 

550 

551 For example, 

552 

553 .. code-block:: python 

554 

555 >>> capabilities = [('OSFamily', 'linux'), ('ISA', 'x86-64'), ('ISA', 'x86-32')] 

556 >>> platforms = combinations_with_unique_keys(capabilities, 2) 

557 >>> for item in platforms: 

558 ... print(item) 

559 ... 

560 (('OSFamily', 'linux'), ('ISA', 'x86-64')) 

561 (('OSFamily', 'linux'), ('ISA', 'x86-32')) 

562 

563 Args: 

564 iterator (list): The list of key-value tuples to return combinations of. 

565 size (int): How many elements to include in each combination. 

566 

567 Returns: 

568 An iterator of the combinations of the input in which each key appears 

569 at most once. 

570 

571 """ 

572 def _validate_combination(combination): 

573 seen = set() 

574 return not any(key in seen or seen.add(key) for key, _ in combination) 

575 yield from filter(_validate_combination, combinations(iterator, size))