Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/service.py: 61.69%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

248 statements  

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17CAS services 

18================== 

19 

20Implements the Content Addressable Storage API and ByteStream API. 

21""" 

22 

23from functools import partial 

24import itertools 

25import logging 

26import re 

27 

28import grpc 

29 

30 

31from buildgrid._enums import ByteStreamResourceType 

32from buildgrid._exceptions import ( 

33 InvalidArgumentError, 

34 NotFoundError, 

35 OutOfRangeError, 

36 PermissionDeniedError, 

37 RetriableError, 

38 StorageFullError) 

39 

40from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc 

41from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

42from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc 

43from buildgrid.server._authentication import AuthContext, authorize 

44from buildgrid.server.metrics_utils import ( 

45 DurationMetric, 

46 generator_method_duration_metric 

47) 

48from buildgrid.server.request_metadata_utils import printable_request_metadata 

49from buildgrid.server.metrics_names import ( 

50 CAS_FIND_MISSING_BLOBS_TIME_METRIC_NAME, 

51 CAS_BATCH_UPDATE_BLOBS_TIME_METRIC_NAME, 

52 CAS_BATCH_READ_BLOBS_TIME_METRIC_NAME, 

53 CAS_GET_TREE_TIME_METRIC_NAME, 

54 CAS_BYTESTREAM_READ_TIME_METRIC_NAME, 

55 CAS_BYTESTREAM_WRITE_TIME_METRIC_NAME) 

56 

57 

58class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer): 

59 

60 def __init__(self, server): 

61 self.__logger = logging.getLogger(__name__) 

62 

63 self._instances = {} 

64 

65 remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server) 

66 

67 # --- Public API --- 

68 

69 def add_instance(self, name, instance): 

70 self._instances[name] = instance 

71 

72 # --- Public API: Servicer --- 

73 

74 @authorize(AuthContext) 

75 @DurationMetric(CAS_FIND_MISSING_BLOBS_TIME_METRIC_NAME) 

76 def FindMissingBlobs(self, request, context): 

77 self.__logger.info(f"FindMissingBlobs request from [{context.peer()}] " 

78 f"([{printable_request_metadata(context.invocation_metadata())}])") 

79 

80 try: 

81 instance = self._get_instance(request.instance_name) 

82 response = instance.find_missing_blobs(request.blob_digests) 

83 

84 return response 

85 

86 except InvalidArgumentError as e: 

87 self.__logger.info(e) 

88 context.set_details(str(e)) 

89 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

90 

91 # Attempt to catch postgres connection failures and instruct clients to retry 

92 except RetriableError as e: 

93 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

94 context.abort_with_status(e.error_status) 

95 

96 except Exception as e: 

97 self.__logger.exception( 

98 f"Unexpected error in FindMissingBlobs; request=[{request}]" 

99 ) 

100 context.set_code(grpc.StatusCode.INTERNAL) 

101 

102 return remote_execution_pb2.FindMissingBlobsResponse() 

103 

104 @authorize(AuthContext) 

105 @DurationMetric(CAS_BATCH_UPDATE_BLOBS_TIME_METRIC_NAME) 

106 def BatchUpdateBlobs(self, request, context): 

107 self.__logger.info(f"BatchUpdateBlobs request from [{context.peer()}] " 

108 f"([{printable_request_metadata(context.invocation_metadata())}])") 

109 

110 try: 

111 instance = self._get_instance(request.instance_name) 

112 response = instance.batch_update_blobs(request.requests) 

113 

114 return response 

115 

116 except InvalidArgumentError as e: 

117 self.__logger.info(e) 

118 context.set_details(str(e)) 

119 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

120 

121 except PermissionDeniedError as e: 

122 self.__logger.exception(e) 

123 context.set_details(str(e)) 

124 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

125 

126 # Attempt to catch postgres connection failures and instruct clients to retry 

127 except RetriableError as e: 

128 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

129 context.abort_with_status(e.error_status) 

130 

131 except Exception as e: 

132 # Log the digests but not the data: 

133 printable_request = {'instance_name': request.instance_name, 

134 'digests': [r.digest for r in request.requests]} 

135 

136 self.__logger.info( 

137 f"Unexpected error in BatchUpdateBlobs; request=[{printable_request}]" 

138 ) 

139 context.set_code(grpc.StatusCode.INTERNAL) 

140 

141 return remote_execution_pb2.BatchReadBlobsResponse() 

142 

143 @authorize(AuthContext) 

144 @DurationMetric(CAS_BATCH_READ_BLOBS_TIME_METRIC_NAME) 

145 def BatchReadBlobs(self, request, context): 

146 self.__logger.info(f"BatchReadBlobs request from [{context.peer()}] " 

147 f"([{printable_request_metadata(context.invocation_metadata())}])") 

148 

149 try: 

150 instance = self._get_instance(request.instance_name) 

151 response = instance.batch_read_blobs(request.digests) 

152 return response 

153 

154 except InvalidArgumentError as e: 

155 self.__logger.info(e) 

156 context.set_details(str(e)) 

157 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

158 

159 except PermissionDeniedError as e: 

160 self.__logger.exception(e) 

161 context.set_details(str(e)) 

162 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

163 

164 # Attempt to catch postgres connection failures and instruct clients to retry 

165 except RetriableError as e: 

166 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

167 context.abort_with_status(e.error_status) 

168 

169 except Exception as e: 

170 self.__logger.exception( 

171 f"Unexpected error in BatchReadBlobs; request=[{request}]" 

172 ) 

173 context.set_details(str(e)) 

174 context.set_code(grpc.StatusCode.INTERNAL) 

175 

176 return remote_execution_pb2.BatchReadBlobsResponse() 

177 

178 @authorize(AuthContext) 

179 @DurationMetric(CAS_GET_TREE_TIME_METRIC_NAME) 

180 def GetTree(self, request, context): 

181 self.__logger.info(f"GetTree request from [{context.peer()}] " 

182 f"([{printable_request_metadata(context.invocation_metadata())}])") 

183 

184 try: 

185 instance = self._get_instance(request.instance_name) 

186 yield from instance.get_tree(request) 

187 

188 except InvalidArgumentError as e: 

189 self.__logger.info(e) 

190 context.set_details(str(e)) 

191 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

192 

193 # Attempt to catch postgres connection failures and instruct clients to retry 

194 except RetriableError as e: 

195 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

196 context.abort_with_status(e.error_status) 

197 

198 except Exception as e: 

199 self.__logger.exception( 

200 f"Unexpected error in GetTree; request=[{request}]" 

201 ) 

202 context.set_code(grpc.StatusCode.INTERNAL) 

203 

204 yield remote_execution_pb2.GetTreeResponse() 

205 

206 # --- Private API --- 

207 

208 def _get_instance(self, instance_name): 

209 try: 

210 return self._instances[instance_name] 

211 

212 except KeyError: 

213 raise InvalidArgumentError(f"Invalid instance name: [{instance_name}]") 

214 

215 

216class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): 

217 

218 # CAS read name format: "{instance_name}/blobs/{hash}/{size}" 

219 CAS_READ_REGEX = '^(.*?)/?(blobs/.*/[0-9]*)$' 

220 # CAS write name format: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}" 

221 # NOTE: No `$` here since we deliberately support extra data on the end of this 

222 # resource name. 

223 CAS_WRITE_REGEX = '^(.*?)/?(uploads/.*/blobs/.*/[0-9]*)' 

224 

225 # LogStream read name format: "{instance_name}/{parent}/logStreams/{name}" 

226 LOGSTREAM_READ_REGEX = '^(.*?)/?([^/]*/logStreams/.*)$' 

227 # LogStream write name format: "{instance_name}/{parent}/logStreams/{name}/{token}" 

228 LOGSTREAM_WRITE_REGEX = '^(.*?)/?([^/]*/logStreams/.*/.*)$' 

229 

230 def __init__(self, server): 

231 self.__logger = logging.getLogger(__name__) 

232 

233 self._instances = {} 

234 

235 bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server) 

236 

237 # --- Public API --- 

238 

239 def add_instance(self, name, instance): 

240 self._instances[name] = instance 

241 

242 # --- Public API: Servicer --- 

243 

244 @authorize(AuthContext) 

245 @generator_method_duration_metric(CAS_BYTESTREAM_READ_TIME_METRIC_NAME) 

246 def Read(self, request, context): 

247 self.__logger.info(f"Read request from [{context.peer()}]") 

248 

249 try: 

250 instance, resource, res_type = self._parse_resource_name( 

251 request.resource_name, 

252 cas_regex=self.CAS_READ_REGEX, 

253 logstream_regex=self.LOGSTREAM_READ_REGEX 

254 ) 

255 if res_type == ByteStreamResourceType.CAS: 

256 blob_details = resource.split('/') 

257 hash_, size_bytes = blob_details[1], blob_details[2] 

258 

259 yield from instance.read_cas_blob( 

260 hash_, size_bytes, request.read_offset, request.read_limit) 

261 

262 elif res_type == ByteStreamResourceType.LOGSTREAM: 

263 context.add_callback(partial(instance.disconnect_logstream_reader, 

264 resource)) 

265 yield from instance.read_logstream(resource, context) 

266 

267 except InvalidArgumentError as e: 

268 self.__logger.info(e) 

269 context.set_details(str(e)) 

270 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

271 yield bytestream_pb2.ReadResponse() 

272 

273 except NotFoundError as e: 

274 self.__logger.exception(e) 

275 context.set_details(str(e)) 

276 context.set_code(grpc.StatusCode.NOT_FOUND) 

277 yield bytestream_pb2.ReadResponse() 

278 

279 except OutOfRangeError as e: 

280 self.__logger.exception(e) 

281 context.set_details(str(e)) 

282 context.set_code(grpc.StatusCode.OUT_OF_RANGE) 

283 yield bytestream_pb2.ReadResponse() 

284 

285 except RetriableError as e: 

286 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

287 context.abort_with_status(e.error_status) 

288 

289 except Exception as e: 

290 self.__logger.exception( 

291 f"Unexpected error in ByteStreamRead; request=[{request}]" 

292 ) 

293 context.set_code(grpc.StatusCode.INTERNAL) 

294 

295 @authorize(AuthContext) 

296 @DurationMetric(CAS_BYTESTREAM_WRITE_TIME_METRIC_NAME) 

297 def Write(self, request_iterator, context): 

298 self.__logger.info(f"Write request from [{context.peer()}]") 

299 

300 request = next(request_iterator) 

301 

302 try: 

303 instance, resource, res_type = self._parse_resource_name( 

304 request.resource_name, 

305 cas_regex=self.CAS_WRITE_REGEX, 

306 logstream_regex=self.LOGSTREAM_WRITE_REGEX, 

307 ) 

308 if res_type == ByteStreamResourceType.CAS: 

309 blob_details = resource.split('/') 

310 _, hash_, size_bytes = blob_details[1], blob_details[3], blob_details[4] 

311 return instance.write_cas_blob( 

312 hash_, size_bytes, itertools.chain([request], request_iterator)) 

313 

314 elif res_type == ByteStreamResourceType.LOGSTREAM: 

315 return instance.write_logstream(resource, request, request_iterator) 

316 

317 except NotImplementedError as e: 

318 self.__logger.info(e) 

319 context.set_details(str(e)) 

320 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

321 

322 except InvalidArgumentError as e: 

323 self.__logger.info(e) 

324 context.set_details(str(e)) 

325 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

326 

327 except NotFoundError as e: 

328 self.__logger.exception(e) 

329 context.set_details(str(e)) 

330 context.set_code(grpc.StatusCode.NOT_FOUND) 

331 

332 except PermissionDeniedError as e: 

333 self.__logger.exception(e) 

334 context.set_details(str(e)) 

335 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

336 

337 except StorageFullError as e: 

338 self.__logger.exception(e) 

339 context.set_details(str(e)) 

340 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 

341 

342 except RetriableError as e: 

343 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

344 context.abort_with_status(e.error_status) 

345 

346 except Exception as e: 

347 # Log all the fields except `data`: 

348 printable_request = {'resource_name': request.resource_name, 

349 'write_offset': request.write_offset, 

350 'finish_write': request.finish_write} 

351 

352 self.__logger.exception( 

353 f"Unexpected error in ByteStreamWrite; request=[{printable_request}]" 

354 ) 

355 context.set_code(grpc.StatusCode.INTERNAL) 

356 

357 return bytestream_pb2.WriteResponse() 

358 

359 @authorize(AuthContext) 

360 def QueryWriteStatus(self, request, context): 

361 self.__logger.info(f"QueryWriteStatus request from [{context.peer()}]") 

362 

363 try: 

364 instance, resource, res_type = self._parse_resource_name( 

365 request.resource_name, 

366 cas_regex=self.CAS_WRITE_REGEX, 

367 logstream_regex=self.LOGSTREAM_WRITE_REGEX, 

368 ) 

369 if res_type == ByteStreamResourceType.CAS: 

370 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

371 context.set_details('Method not implemented!') 

372 elif res_type == ByteStreamResourceType.LOGSTREAM: 

373 return instance.query_logstream_status(resource, context) 

374 

375 except NotImplementedError as e: 

376 self.__logger.info(e) 

377 context.set_details(str(e)) 

378 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

379 

380 except InvalidArgumentError as e: 

381 self.__logger.info(e) 

382 context.set_details(str(e)) 

383 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

384 

385 except NotFoundError as e: 

386 self.__logger.error(e) 

387 context.set_details(str(e)) 

388 context.set_code(grpc.StatusCode.NOT_FOUND) 

389 

390 except PermissionDeniedError as e: 

391 self.__logger.error(e) 

392 context.set_details(str(e)) 

393 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

394 

395 except StorageFullError as e: 

396 self.__logger.error(e) 

397 context.set_details(str(e)) 

398 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 

399 

400 except RetriableError as e: 

401 self.__logger.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

402 context.abort_with_status(e.error_status) 

403 

404 except Exception as e: 

405 self.__logger.exception( 

406 f"Unexpected error in ByteStreamQueryWriteStatus; request=[{request}]" 

407 ) 

408 context.set_code(grpc.StatusCode.INTERNAL) 

409 

410 return bytestream_pb2.QueryWriteStatusResponse() 

411 

412 # --- Private API --- 

413 

414 def _parse_resource_name(self, resource_name, cas_regex='', logstream_regex=''): 

415 res_type = None 

416 cas_match = re.match(cas_regex, resource_name) 

417 logstream_match = re.match(logstream_regex, resource_name) 

418 if cas_match: 

419 instance_name = cas_match[1] 

420 resource_name = cas_match[2] 

421 res_type = ByteStreamResourceType.CAS 

422 

423 elif logstream_match: 

424 instance_name = logstream_match[1] 

425 resource_name = logstream_match[2] 

426 res_type = ByteStreamResourceType.LOGSTREAM 

427 

428 else: 

429 raise InvalidArgumentError( 

430 f"Invalid resource name: [{resource_name}]") 

431 

432 instance = self._get_instance(instance_name) 

433 return instance, resource_name, res_type 

434 

435 def _get_instance(self, instance_name): 

436 try: 

437 return self._instances[instance_name] 

438 

439 except KeyError: 

440 raise InvalidArgumentError(f"Invalid instance name: [{instance_name}]")