Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from collections import namedtuple 

17from urllib.parse import urljoin 

18from operator import attrgetter 

19from typing import Dict, Iterable, List, Sequence, TypeVar, Union 

20import hashlib 

21import json 

22import os 

23import socket 

24import threading 

25 

26from buildgrid.settings import HASH, HASH_LENGTH, BROWSER_URL_FORMAT 

27from buildgrid._enums import JobEventType 

28from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

29 

30 

31T = TypeVar('T') 

32 

33 

34class BrowserURL: 

35 

36 __url_markers = ( 

37 '%(instance)s', 

38 '%(type)s', 

39 '%(hash)s', 

40 '%(sizebytes)s', 

41 ) 

42 

43 def __init__(self, base_url, instance_name=None): 

44 """Begins browser URL helper initialization.""" 

45 self.__base_url = base_url 

46 self.__initialized = False 

47 self.__url_spec = { 

48 '%(instance)s': instance_name or '', 

49 } 

50 

51 def for_message(self, message_type, message_digest): 

52 """Completes browser URL initialization for a protobuf message.""" 

53 if self.__initialized: 

54 return False 

55 

56 self.__url_spec['%(type)s'] = message_type 

57 self.__url_spec['%(hash)s'] = message_digest.hash 

58 self.__url_spec['%(sizebytes)s'] = str(message_digest.size_bytes) 

59 

60 self.__initialized = True 

61 return True 

62 

63 def generate(self): 

64 """Generates a browser URL string.""" 

65 if not self.__base_url or not self.__initialized: 

66 return None 

67 

68 url_tail = BROWSER_URL_FORMAT 

69 

70 for url_marker in self.__url_markers: 

71 if url_marker not in self.__url_spec: 

72 return None 

73 if url_marker not in url_tail: 

74 continue 

75 url_tail = url_tail.replace(url_marker, self.__url_spec[url_marker]) 

76 

77 return urljoin(self.__base_url, url_tail) 

78 

79 

80class Condition(threading.Condition): 

81 

82 """Subclass of ``threading.Condition`` with ``wait`` overridden. 

83 

84 In this implementation, ``wait`` only releases the lock if other 

85 threads are actually waiting for the lock, otherwise it does nothing. 

86 

87 """ 

88 

89 def __init__(self, lock=None): 

90 super().__init__(lock=lock) 

91 self.thread_count = 0 

92 

93 def __enter__(self): 

94 self.thread_count += 1 

95 return super().__enter__() 

96 

97 def __exit__(self, *args): 

98 self.thread_count -= 1 

99 self.notify() 

100 return super().__exit__(*args) 

101 

102 def wait(self, timeout=None): 

103 """Wait if other threads are trying to acquire the lock. 

104 

105 If other threads have attempted to acquire the lock for this Condition 

106 using ``with``, this method behaves the same as ``wait`` on a regular 

107 ``threading.Condition``. 

108 

109 If only one thread has attempted to acquire the lock, then that thread 

110 must be the current thread. In that case, this method doesn't release 

111 the lock or wait at all, it simply returns ``True`` as if it had been 

112 woken by ``threading.Condition.notify``. 

113 

114 """ 

115 if not super()._is_owned: 

116 raise RuntimeError("cannot wait on un-acquired lock") 

117 

118 if self.thread_count > 1: 

119 return super().wait(timeout) 

120 return True 

121 

122 

123class TypedEvent: 

124 

125 """Wrapper around a ``threading.Event`` to support event 'types'""" 

126 

127 def __init__(self): 

128 self.event = threading.Event() 

129 self.history = [] 

130 

131 def set(self, type=None): 

132 self.history.append(type) 

133 self.event.set() 

134 

135 def clear(self): 

136 self.event.clear() 

137 

138 def notify_change(self): 

139 self.set(type=JobEventType.CHANGE) 

140 self.clear() 

141 

142 def notify_stop(self): 

143 self.set(type=JobEventType.STOP) 

144 self.clear() 

145 

146 def wait(self, last_received=None): 

147 if last_received is not None: 

148 next_index = last_received + 1 

149 if next_index < len(self.history): 

150 return next_index, self.history[next_index] 

151 self.event.wait() 

152 return len(self.history) - 1, self.history[-1] 

153 

154 

155class JobState: 

156 

157 def __init__(self, job): 

158 self.cancelled = job.cancelled 

159 self.operation_stage = job.operation_stage 

160 

161 def __eq__(self, other): 

162 return (self.cancelled == other.cancelled and 

163 self.operation_stage == other.operation_stage) 

164 

165 

166class JobWatchSpec: 

167 

168 """Structure to track what operations are being watched for a given job. 

169 

170 This also contains the event used for notifying watchers of changes, and the 

171 state that the job was in after a change was last detected. 

172 

173 """ 

174 

175 def __init__(self, job): 

176 """Instantiate a new JobWatchSpec. 

177 

178 Args: 

179 job (buildgrid.server.job.Job): The job that this spec tracks the 

180 watchers and state for. 

181 

182 """ 

183 self.event = TypedEvent() 

184 self.last_state = JobState(job) 

185 self.operations = {} 

186 self.operations_lock = threading.Lock() 

187 

188 @property 

189 def peers(self): 

190 with self.operations_lock: 

191 return [peer for op in self.operations.values() 

192 for peer in op["peers"]] 

193 

194 def peers_for_operation(self, operation_name): 

195 """Returns a copy of the list of peers for the given operation. 

196 

197 If the operation is not being watched, or for some reason has no "peers" 

198 key, the empty list is returned. 

199 

200 Args: 

201 operation_name (string): The name of the operation to get the list 

202 of peers for. 

203 

204 """ 

205 try: 

206 return self.operations[operation_name]["peers"].copy() 

207 except KeyError: 

208 return [] 

209 

210 def add_peer(self, operation_name, peer): 

211 """Add a peer to the set of peers watching the job this spec is for. 

212 

213 Takes an operation name and a peer and tracks that the peer is watching 

214 the given operation. 

215 

216 Args: 

217 operation_name (string): The name of the operation that the peer 

218 is watching for updates on. 

219 peer (string): The peer that is starting to watch for updates. 

220 

221 """ 

222 with self.operations_lock: 

223 if operation_name in self.operations: 

224 self.operations[operation_name]["peers"].append(peer) 

225 else: 

226 self.operations[operation_name] = { 

227 "peers": [peer] 

228 } 

229 

230 def remove_peer(self, operation_name, peer): 

231 """Remove a peer from the list watching an operation for this job. 

232 

233 The inverse of ``add_peer``. Takes an operation name and a peer and 

234 removes that peer from the list of peers watching that operation. 

235 If this leaves the operation with no peers watching it, the operation 

236 is removed from the ``JobWatchSpec``. 

237 

238 Args: 

239 operation_name (string): The name of the operation that is 

240 no longer being watched by the peer. 

241 peer (string): The name of the peer that is stopping watching. 

242 

243 """ 

244 with self.operations_lock: 

245 if operation_name in self.operations: 

246 self.operations[operation_name]["peers"].remove(peer) 

247 if not self.operations[operation_name]["peers"]: 

248 self.operations.pop(operation_name) 

249 

250 

251Failure = namedtuple('Failure', ['hash', 'type']) 

252 

253 

254def get_hostname(): 

255 """Returns the hostname of the machine executing that function. 

256 

257 Returns: 

258 str: Hostname for the current machine. 

259 """ 

260 return socket.gethostname() 

261 

262 

263def get_hash_type(): 

264 """Returns the hash type.""" 

265 hash_name = HASH().name 

266 if hash_name == "sha256": 

267 return remote_execution_pb2.DigestFunction.SHA256 

268 return remote_execution_pb2.DigestFunction.UNKNOWN 

269 

270 

271def create_digest(bytes_to_digest): 

272 """Computes the :obj:`Digest` of a piece of data. 

273 

274 The :obj:`Digest` of a data is a function of its hash **and** size. 

275 

276 Args: 

277 bytes_to_digest (bytes): byte data to digest. 

278 

279 Returns: 

280 :obj:`Digest`: The :obj:`Digest` for the given byte data. 

281 """ 

282 return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(), 

283 size_bytes=len(bytes_to_digest)) 

284 

285 

286def parse_digest(digest_string): 

287 """Creates a :obj:`Digest` from a digest string. 

288 

289 A digest string should alway be: ``{hash}/{size_bytes}``. 

290 

291 Args: 

292 digest_string (str): the digest string. 

293 

294 Returns: 

295 :obj:`Digest`: The :obj:`Digest` read from the string or None if 

296 `digest_string` is not a valid digest string. 

297 """ 

298 digest_hash, digest_size = digest_string.split('/') 

299 

300 if len(digest_hash) == HASH_LENGTH and digest_size.isdigit(): 

301 return remote_execution_pb2.Digest(hash=digest_hash, 

302 size_bytes=int(digest_size)) 

303 

304 return None 

305 

306 

307def read_file(file_path): 

308 """Loads raw file content in memory. 

309 

310 Args: 

311 file_path (str): path to the target file. 

312 

313 Returns: 

314 bytes: Raw file's content until EOF. 

315 

316 Raises: 

317 OSError: If `file_path` does not exist or is not readable. 

318 """ 

319 with open(file_path, 'rb') as byte_file: 

320 return byte_file.read() 

321 

322 

323def write_file(file_path, content): 

324 """Dumps raw memory content to a file. 

325 

326 Args: 

327 file_path (str): path to the target file. 

328 content (bytes): raw file's content. 

329 

330 Raises: 

331 OSError: If `file_path` does not exist or is not writable. 

332 """ 

333 with open(file_path, 'wb') as byte_file: 

334 byte_file.write(content) 

335 byte_file.flush() 

336 

337 

338def merkle_tree_maker(directory_path): 

339 """Walks a local folder tree, generating :obj:`FileNode` and 

340 :obj:`DirectoryNode`. 

341 

342 Args: 

343 directory_path (str): absolute or relative path to a local directory. 

344 

345 Yields: 

346 :obj:`Message`, bytes, str: a tutple of either a :obj:`FileNode` or 

347 :obj:`DirectoryNode` message, the corresponding blob and the 

348 corresponding node path. 

349 """ 

350 directory_name = os.path.basename(directory_path) 

351 

352 # Actual generator, yields recursively FileNodes and DirectoryNodes: 

353 def __merkle_tree_maker(directory_path, directory_name): 

354 if not os.path.isabs(directory_path): 

355 directory_path = os.path.abspath(directory_path) 

356 

357 directory = remote_execution_pb2.Directory() 

358 

359 files, directories, symlinks = [], [], [] 

360 for directory_entry in os.scandir(directory_path): 

361 node_name, node_path = directory_entry.name, directory_entry.path 

362 

363 if directory_entry.is_file(follow_symlinks=False): 

364 node_blob = read_file(directory_entry.path) 

365 node_digest = create_digest(node_blob) 

366 

367 node = remote_execution_pb2.FileNode() 

368 node.name = node_name 

369 node.digest.CopyFrom(node_digest) 

370 node.is_executable = os.access(node_path, os.X_OK) 

371 

372 files.append(node) 

373 

374 yield node, node_blob, node_path 

375 

376 elif directory_entry.is_dir(follow_symlinks=False): 

377 node, node_blob, _ = yield from __merkle_tree_maker(node_path, node_name) 

378 

379 directories.append(node) 

380 

381 yield node, node_blob, node_path 

382 

383 # Create a SymlinkNode; 

384 elif os.path.islink(directory_entry.path): 

385 node_target = os.readlink(directory_entry.path) 

386 

387 node = remote_execution_pb2.SymlinkNode() 

388 node.name = directory_entry.name 

389 node.target = node_target 

390 

391 symlinks.append(node) 

392 

393 files.sort(key=attrgetter('name')) 

394 directories.sort(key=attrgetter('name')) 

395 symlinks.sort(key=attrgetter('name')) 

396 

397 directory.files.extend(files) 

398 directory.directories.extend(directories) 

399 directory.symlinks.extend(symlinks) 

400 

401 node_blob = directory.SerializeToString() 

402 node_digest = create_digest(node_blob) 

403 

404 node = remote_execution_pb2.DirectoryNode() 

405 node.name = directory_name 

406 node.digest.CopyFrom(node_digest) 

407 

408 return node, node_blob, directory_path 

409 

410 node, node_blob, node_path = yield from __merkle_tree_maker(directory_path, 

411 directory_name) 

412 

413 yield node, node_blob, node_path 

414 

415 

416def output_file_maker(file_path, input_path, file_digest): 

417 """Creates an :obj:`OutputFile` from a local file and possibly upload it. 

418 

419 Note: 

420 `file_path` **must** point inside or be relative to `input_path`. 

421 

422 Args: 

423 file_path (str): absolute or relative path to a local file. 

424 input_path (str): absolute or relative path to the input root directory. 

425 file_digest (:obj:`Digest`): the underlying file's digest. 

426 

427 Returns: 

428 :obj:`OutputFile`: a new :obj:`OutputFile` object for the file pointed 

429 by `file_path`. 

430 """ 

431 if not os.path.isabs(file_path): 

432 file_path = os.path.abspath(file_path) 

433 if not os.path.isabs(input_path): 

434 input_path = os.path.abspath(input_path) 

435 

436 output_file = remote_execution_pb2.OutputFile() 

437 output_file.digest.CopyFrom(file_digest) 

438 # OutputFile.path should be relative to the working directory 

439 output_file.path = os.path.relpath(file_path, start=input_path) 

440 output_file.is_executable = os.access(file_path, os.X_OK) 

441 

442 return output_file 

443 

444 

445def output_directory_maker(directory_path, working_path, tree_digest): 

446 """Creates an :obj:`OutputDirectory` from a local directory. 

447 

448 Note: 

449 `directory_path` **must** point inside or be relative to `input_path`. 

450 

451 Args: 

452 directory_path (str): absolute or relative path to a local directory. 

453 working_path (str): absolute or relative path to the working directory. 

454 tree_digest (:obj:`Digest`): the underlying folder tree's digest. 

455 

456 Returns: 

457 :obj:`OutputDirectory`: a new :obj:`OutputDirectory` for the directory 

458 pointed by `directory_path`. 

459 """ 

460 if not os.path.isabs(directory_path): 

461 directory_path = os.path.abspath(directory_path) 

462 if not os.path.isabs(working_path): 

463 working_path = os.path.abspath(working_path) 

464 

465 output_directory = remote_execution_pb2.OutputDirectory() 

466 output_directory.tree_digest.CopyFrom(tree_digest) 

467 output_directory.path = os.path.relpath(directory_path, start=working_path) 

468 

469 return output_directory 

470 

471 

472def convert_values_to_sorted_lists(dictionary: Dict[str, Union[str, Sequence[str]]]) -> None: 

473 """ Given a dictionary, do the following: 

474 

475 1. Turn strings into singleton lists 

476 2. Turn all other sequence types into sorted lists with list() 

477 

478 This is a mutating operation. No value is returned. 

479 

480 """ 

481 for k in dictionary: 

482 value = dictionary[k] 

483 if isinstance(value, str): 

484 dictionary[k] = [value] 

485 else: 

486 try: 

487 dictionary[k] = sorted(list(value)) 

488 except TypeError: 

489 raise ValueError(f"{value} cannot be sorted") 

490 

491 

492def hash_from_dict(dictionary: Dict[str, List[str]]) -> str: 

493 """ Get the hash represntation of a dictionary """ 

494 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest() 

495 

496 

497def get_unique_objects_by_attribute(objects: Sequence[T], attribute: str) -> Iterable[T]: 

498 """ Return a list of unique objects based on a hashable attribute or chained attributes. 

499 

500 Note that this does not provide any sanitization, and any problematic elements will 

501 only raise exceptions when iterated on. """ 

502 

503 attrs_seen = set() 

504 

505 for obj in objects: 

506 if obj: 

507 attr_value = attrgetter(attribute)(obj) 

508 if attr_value not in attrs_seen: 

509 attrs_seen.add(attr_value) 

510 yield obj