Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/service.py: 61.15%

260 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17CAS services 

18================== 

19 

20Implements the Content Addressable Storage API and ByteStream API. 

21""" 

22 

23from functools import partial 

24import itertools 

25import logging 

26import re 

27 

28import grpc 

29 

30import buildgrid.server.context as context_module 

31 

32from buildgrid._enums import ByteStreamResourceType 

33from buildgrid._exceptions import ( 

34 InvalidArgumentError, 

35 NotFoundError, 

36 OutOfRangeError, 

37 PermissionDeniedError, 

38 RetriableError, 

39 StorageFullError) 

40 

41from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc 

42from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

43from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc 

44from buildgrid.server._authentication import AuthContext, authorize 

45from buildgrid.server.metrics_utils import ( 

46 DurationMetric, 

47 generator_method_duration_metric 

48) 

49from buildgrid.server.request_metadata_utils import printable_request_metadata 

50from buildgrid.server.metrics_names import ( 

51 CAS_FIND_MISSING_BLOBS_TIME_METRIC_NAME, 

52 CAS_BATCH_UPDATE_BLOBS_TIME_METRIC_NAME, 

53 CAS_BATCH_READ_BLOBS_TIME_METRIC_NAME, 

54 CAS_GET_TREE_TIME_METRIC_NAME, 

55 CAS_BYTESTREAM_READ_TIME_METRIC_NAME, 

56 CAS_BYTESTREAM_WRITE_TIME_METRIC_NAME) 

57 

58 

59class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer): 

60 

61 def __init__(self, server): 

62 self.__logger = logging.getLogger(__name__) 

63 

64 self._instances = {} 

65 

66 remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server) 

67 

68 # --- Public API --- 

69 

70 def add_instance(self, name, instance): 

71 self._instances[name] = instance 

72 

73 # --- Public API: Servicer --- 

74 

75 @context_module.metadatacontext() 

76 @authorize(AuthContext) 

77 @DurationMetric(CAS_FIND_MISSING_BLOBS_TIME_METRIC_NAME) 

78 def FindMissingBlobs(self, request, context): 

79 self.__logger.info(f"FindMissingBlobs request from [{context.peer()}] " 

80 f"([{printable_request_metadata(context.invocation_metadata())}])") 

81 

82 try: 

83 instance = self._get_instance(request.instance_name) 

84 response = instance.find_missing_blobs(request.blob_digests) 

85 

86 return response 

87 

88 except InvalidArgumentError as e: 

89 self.__logger.info(e) 

90 context.set_details(str(e)) 

91 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

92 

93 # Attempt to catch postgres connection failures and instruct clients to retry 

94 except RetriableError as e: 

95 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

96 context.abort_with_status(e.error_status) 

97 

98 except Exception as e: 

99 self.__logger.exception( 

100 f"Unexpected error in FindMissingBlobs; request=[{request}]" 

101 ) 

102 context.set_details(str(e)) 

103 context.set_code(grpc.StatusCode.INTERNAL) 

104 

105 return remote_execution_pb2.FindMissingBlobsResponse() 

106 

107 @context_module.metadatacontext() 

108 @authorize(AuthContext) 

109 @DurationMetric(CAS_BATCH_UPDATE_BLOBS_TIME_METRIC_NAME) 

110 def BatchUpdateBlobs(self, request, context): 

111 self.__logger.info(f"BatchUpdateBlobs request from [{context.peer()}] " 

112 f"([{printable_request_metadata(context.invocation_metadata())}])") 

113 

114 try: 

115 instance = self._get_instance(request.instance_name) 

116 response = instance.batch_update_blobs(request.requests) 

117 

118 return response 

119 

120 except InvalidArgumentError as e: 

121 self.__logger.info(e) 

122 context.set_details(str(e)) 

123 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

124 

125 except PermissionDeniedError as e: 

126 self.__logger.exception(e) 

127 context.set_details(str(e)) 

128 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

129 

130 # Attempt to catch postgres connection failures and instruct clients to retry 

131 except RetriableError as e: 

132 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

133 context.abort_with_status(e.error_status) 

134 

135 except Exception as e: 

136 # Log the digests but not the data: 

137 printable_request = {'instance_name': request.instance_name, 

138 'digests': [r.digest for r in request.requests]} 

139 

140 self.__logger.info( 

141 f"Unexpected error in BatchUpdateBlobs; request=[{printable_request}]" 

142 ) 

143 context.set_details(str(e)) 

144 context.set_code(grpc.StatusCode.INTERNAL) 

145 

146 return remote_execution_pb2.BatchReadBlobsResponse() 

147 

148 @context_module.metadatacontext() 

149 @authorize(AuthContext) 

150 @DurationMetric(CAS_BATCH_READ_BLOBS_TIME_METRIC_NAME) 

151 def BatchReadBlobs(self, request, context): 

152 self.__logger.info(f"BatchReadBlobs request from [{context.peer()}] " 

153 f"([{printable_request_metadata(context.invocation_metadata())}])") 

154 

155 try: 

156 instance = self._get_instance(request.instance_name) 

157 response = instance.batch_read_blobs(request.digests) 

158 return response 

159 

160 except InvalidArgumentError as e: 

161 self.__logger.info(e) 

162 context.set_details(str(e)) 

163 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

164 

165 except PermissionDeniedError as e: 

166 self.__logger.exception(e) 

167 context.set_details(str(e)) 

168 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

169 

170 # Attempt to catch postgres connection failures and instruct clients to retry 

171 except RetriableError as e: 

172 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

173 context.abort_with_status(e.error_status) 

174 

175 except Exception as e: 

176 self.__logger.exception( 

177 f"Unexpected error in BatchReadBlobs; request=[{request}]" 

178 ) 

179 context.set_details(str(e)) 

180 context.set_code(grpc.StatusCode.INTERNAL) 

181 

182 return remote_execution_pb2.BatchReadBlobsResponse() 

183 

184 @authorize(AuthContext) 

185 @DurationMetric(CAS_GET_TREE_TIME_METRIC_NAME) 

186 def GetTree(self, request, context): 

187 self.__logger.info(f"GetTree request from [{context.peer()}] " 

188 f"([{printable_request_metadata(context.invocation_metadata())}])") 

189 

190 try: 

191 instance = self._get_instance(request.instance_name) 

192 yield from instance.get_tree(request) 

193 

194 except InvalidArgumentError as e: 

195 self.__logger.info(e) 

196 context.set_details(str(e)) 

197 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

198 

199 # Attempt to catch postgres connection failures and instruct clients to retry 

200 except RetriableError as e: 

201 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

202 context.abort_with_status(e.error_status) 

203 

204 except Exception as e: 

205 self.__logger.exception( 

206 f"Unexpected error in GetTree; request=[{request}]" 

207 ) 

208 context.set_details(str(e)) 

209 context.set_code(grpc.StatusCode.INTERNAL) 

210 

211 yield remote_execution_pb2.GetTreeResponse() 

212 

213 # --- Private API --- 

214 

215 def _get_instance(self, instance_name): 

216 try: 

217 return self._instances[instance_name] 

218 

219 except KeyError: 

220 raise InvalidArgumentError(f"Invalid instance name: [{instance_name}]") 

221 

222 

223class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): 

224 

225 # CAS read name format: "{instance_name}/blobs/{hash}/{size}" 

226 CAS_READ_REGEX = '^(.*?)/?(blobs/.*/[0-9]*)$' 

227 # CAS write name format: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}" 

228 # NOTE: No `$` here since we deliberately support extra data on the end of this 

229 # resource name. 

230 CAS_WRITE_REGEX = '^(.*?)/?(uploads/.*/blobs/.*/[0-9]*)' 

231 

232 # LogStream read name format: "{instance_name}/{parent}/logStreams/{name}" 

233 LOGSTREAM_READ_REGEX = '^(.*?)/?([^/]*/logStreams/.*)$' 

234 # LogStream write name format: "{instance_name}/{parent}/logStreams/{name}/{token}" 

235 LOGSTREAM_WRITE_REGEX = '^(.*?)/?([^/]*/logStreams/.*/.*)$' 

236 

237 def __init__(self, server): 

238 self.__logger = logging.getLogger(__name__) 

239 

240 self._instances = {} 

241 

242 bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server) 

243 

244 # --- Public API --- 

245 

246 def add_instance(self, name, instance): 

247 self._instances[name] = instance 

248 

249 # --- Public API: Servicer --- 

250 

251 @context_module.metadatacontext() 

252 @authorize(AuthContext) 

253 @generator_method_duration_metric(CAS_BYTESTREAM_READ_TIME_METRIC_NAME) 

254 def Read(self, request, context): 

255 self.__logger.info(f"Read request from [{context.peer()}] " 

256 f"([{printable_request_metadata(context.invocation_metadata())}])") 

257 

258 try: 

259 instance, resource, res_type = self._parse_resource_name( 

260 request.resource_name, 

261 cas_regex=self.CAS_READ_REGEX, 

262 logstream_regex=self.LOGSTREAM_READ_REGEX 

263 ) 

264 if res_type == ByteStreamResourceType.CAS: 

265 blob_details = resource.split('/') 

266 hash_, size_bytes = blob_details[1], blob_details[2] 

267 

268 yield from instance.read_cas_blob( 

269 hash_, size_bytes, request.read_offset, request.read_limit) 

270 

271 elif res_type == ByteStreamResourceType.LOGSTREAM: 

272 context.add_callback(partial(instance.disconnect_logstream_reader, 

273 resource)) 

274 yield from instance.read_logstream(resource, context) 

275 

276 except InvalidArgumentError as e: 

277 self.__logger.info(e) 

278 context.set_details(str(e)) 

279 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

280 yield bytestream_pb2.ReadResponse() 

281 

282 except NotFoundError as e: 

283 self.__logger.exception(e) 

284 context.set_details(str(e)) 

285 context.set_code(grpc.StatusCode.NOT_FOUND) 

286 yield bytestream_pb2.ReadResponse() 

287 

288 except OutOfRangeError as e: 

289 self.__logger.exception(e) 

290 context.set_details(str(e)) 

291 context.set_code(grpc.StatusCode.OUT_OF_RANGE) 

292 yield bytestream_pb2.ReadResponse() 

293 

294 except RetriableError as e: 

295 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

296 context.abort_with_status(e.error_status) 

297 

298 except Exception as e: 

299 self.__logger.exception( 

300 f"Unexpected error in ByteStreamRead; request=[{request}]" 

301 ) 

302 context.set_details(str(e)) 

303 context.set_code(grpc.StatusCode.INTERNAL) 

304 

305 @context_module.metadatacontext() 

306 @authorize(AuthContext) 

307 @DurationMetric(CAS_BYTESTREAM_WRITE_TIME_METRIC_NAME) 

308 def Write(self, request_iterator, context): 

309 self.__logger.info(f"Write request from [{context.peer()}] " 

310 f"([{printable_request_metadata(context.invocation_metadata())}])") 

311 

312 request = next(request_iterator) 

313 

314 try: 

315 instance, resource, res_type = self._parse_resource_name( 

316 request.resource_name, 

317 cas_regex=self.CAS_WRITE_REGEX, 

318 logstream_regex=self.LOGSTREAM_WRITE_REGEX, 

319 ) 

320 if res_type == ByteStreamResourceType.CAS: 

321 blob_details = resource.split('/') 

322 _, hash_, size_bytes = blob_details[1], blob_details[3], blob_details[4] 

323 return instance.write_cas_blob( 

324 hash_, size_bytes, itertools.chain([request], request_iterator)) 

325 

326 elif res_type == ByteStreamResourceType.LOGSTREAM: 

327 return instance.write_logstream(resource, request, request_iterator) 

328 

329 except NotImplementedError as e: 

330 self.__logger.info(e) 

331 context.set_details(str(e)) 

332 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

333 

334 except InvalidArgumentError as e: 

335 self.__logger.info(e) 

336 context.set_details(str(e)) 

337 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

338 

339 except NotFoundError as e: 

340 self.__logger.exception(e) 

341 context.set_details(str(e)) 

342 context.set_code(grpc.StatusCode.NOT_FOUND) 

343 

344 except PermissionDeniedError as e: 

345 self.__logger.exception(e) 

346 context.set_details(str(e)) 

347 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

348 

349 except StorageFullError as e: 

350 self.__logger.exception(e) 

351 context.set_details(str(e)) 

352 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 

353 

354 except RetriableError as e: 

355 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

356 context.abort_with_status(e.error_status) 

357 

358 except Exception as e: 

359 # Log all the fields except `data`: 

360 printable_request = {'resource_name': request.resource_name, 

361 'write_offset': request.write_offset, 

362 'finish_write': request.finish_write} 

363 

364 self.__logger.exception( 

365 f"Unexpected error in ByteStreamWrite; request=[{printable_request}]" 

366 ) 

367 context.set_details(str(e)) 

368 context.set_code(grpc.StatusCode.INTERNAL) 

369 

370 return bytestream_pb2.WriteResponse() 

371 

372 @authorize(AuthContext) 

373 def QueryWriteStatus(self, request, context): 

374 self.__logger.info(f"QueryWriteStatus request from [{context.peer()}]") 

375 

376 try: 

377 instance, resource, res_type = self._parse_resource_name( 

378 request.resource_name, 

379 cas_regex=self.CAS_WRITE_REGEX, 

380 logstream_regex=self.LOGSTREAM_WRITE_REGEX, 

381 ) 

382 if res_type == ByteStreamResourceType.CAS: 

383 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

384 context.set_details('Method not implemented!') 

385 elif res_type == ByteStreamResourceType.LOGSTREAM: 

386 return instance.query_logstream_status(resource, context) 

387 

388 except NotImplementedError as e: 

389 self.__logger.info(e) 

390 context.set_details(str(e)) 

391 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

392 

393 except InvalidArgumentError as e: 

394 self.__logger.info(e) 

395 context.set_details(str(e)) 

396 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

397 

398 except NotFoundError as e: 

399 self.__logger.error(e) 

400 context.set_details(str(e)) 

401 context.set_code(grpc.StatusCode.NOT_FOUND) 

402 

403 except PermissionDeniedError as e: 

404 self.__logger.error(e) 

405 context.set_details(str(e)) 

406 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

407 

408 except StorageFullError as e: 

409 self.__logger.error(e) 

410 context.set_details(str(e)) 

411 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 

412 

413 except RetriableError as e: 

414 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

415 context.abort_with_status(e.error_status) 

416 

417 except Exception as e: 

418 self.__logger.exception( 

419 f"Unexpected error in ByteStreamQueryWriteStatus; request=[{request}]" 

420 ) 

421 context.set_details(str(e)) 

422 context.set_code(grpc.StatusCode.INTERNAL) 

423 

424 return bytestream_pb2.QueryWriteStatusResponse() 

425 

426 # --- Private API --- 

427 

428 def _parse_resource_name(self, resource_name, cas_regex='', logstream_regex=''): 

429 res_type = None 

430 cas_match = re.match(cas_regex, resource_name) 

431 logstream_match = re.match(logstream_regex, resource_name) 

432 if cas_match: 

433 instance_name = cas_match[1] 

434 resource_name = cas_match[2] 

435 res_type = ByteStreamResourceType.CAS 

436 

437 elif logstream_match: 

438 instance_name = logstream_match[1] 

439 resource_name = logstream_match[2] 

440 res_type = ByteStreamResourceType.LOGSTREAM 

441 

442 else: 

443 raise InvalidArgumentError( 

444 f"Invalid resource name: [{resource_name}]") 

445 

446 instance = self._get_instance(instance_name) 

447 return instance, resource_name, res_type 

448 

449 def _get_instance(self, instance_name): 

450 try: 

451 return self._instances[instance_name] 

452 

453 except KeyError: 

454 raise InvalidArgumentError(f"Invalid instance name: [{instance_name}]")