Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17CAS services 

18================== 

19 

20Implements the Content Addressable Storage API and ByteStream API. 

21""" 

22 

23import logging 

24import grpc 

25 

26from buildgrid._enums import MetricRecordDomain 

27from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError, PermissionDeniedError 

28from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc 

29from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

30from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc 

31from buildgrid.server._authentication import AuthContext, authorize 

32from buildgrid.server.metrics_utils import DurationMetric 

33from buildgrid.server.request_metadata_utils import printable_request_metadata 

34from buildgrid.server.metrics_names import ( 

35 CAS_FIND_MISSING_BLOBS_TIME_METRIC_NAME, 

36 CAS_BATCH_UPDATE_BLOBS_TIME_METRIC_NAME, 

37 CAS_BATCH_READ_BLOBS_TIME_METRIC_NAME, 

38 CAS_GET_TREE_TIME_METRIC_NAME, 

39 CAS_BYTESTREAM_READ_TIME_METRIC_NAME, 

40 CAS_BYTESTREAM_WRITE_TIME_METRIC_NAME) 

41 

42 

43class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer): 

44 

45 def __init__(self, server): 

46 self.__logger = logging.getLogger(__name__) 

47 

48 self._instances = {} 

49 

50 remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server) 

51 

52 # --- Public API --- 

53 

54 def add_instance(self, name, instance): 

55 self._instances[name] = instance 

56 

57 # --- Public API: Servicer --- 

58 

59 @authorize(AuthContext) 

60 @DurationMetric(CAS_FIND_MISSING_BLOBS_TIME_METRIC_NAME, MetricRecordDomain.CAS) 

61 def FindMissingBlobs(self, request, context): 

62 self.__logger.debug(f"FindMissingBlobs request from [{context.peer()}] " 

63 f"([{printable_request_metadata(context)}])") 

64 

65 try: 

66 instance = self._get_instance(request.instance_name) 

67 response = instance.find_missing_blobs(request.blob_digests) 

68 

69 return response 

70 

71 except InvalidArgumentError as e: 

72 self.__logger.error(e) 

73 context.set_details(str(e)) 

74 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

75 

76 return remote_execution_pb2.FindMissingBlobsResponse() 

77 

78 @authorize(AuthContext) 

79 @DurationMetric(CAS_BATCH_UPDATE_BLOBS_TIME_METRIC_NAME, MetricRecordDomain.CAS) 

80 def BatchUpdateBlobs(self, request, context): 

81 self.__logger.debug(f"BatchUpdateBlobs request from [{context.peer()}] " 

82 f"([{printable_request_metadata(context)}])") 

83 

84 try: 

85 instance = self._get_instance(request.instance_name) 

86 response = instance.batch_update_blobs(request.requests) 

87 

88 return response 

89 

90 except InvalidArgumentError as e: 

91 self.__logger.error(e) 

92 context.set_details(str(e)) 

93 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

94 

95 except PermissionDeniedError as e: 

96 self.__logger.error(e) 

97 context.set_details(str(e)) 

98 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

99 

100 return remote_execution_pb2.BatchReadBlobsResponse() 

101 

102 @authorize(AuthContext) 

103 @DurationMetric(CAS_BATCH_READ_BLOBS_TIME_METRIC_NAME, MetricRecordDomain.CAS) 

104 def BatchReadBlobs(self, request, context): 

105 self.__logger.debug(f"BatchReadBlobs request from [{context.peer()}] " 

106 f"([{printable_request_metadata(context)}])") 

107 

108 try: 

109 instance = self._get_instance(request.instance_name) 

110 response = instance.batch_read_blobs(request.digests) 

111 return response 

112 

113 except InvalidArgumentError as e: 

114 self.__logger.error(e) 

115 context.set_details(str(e)) 

116 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

117 

118 except PermissionDeniedError as e: 

119 self.__logger.error(e) 

120 context.set_details(str(e)) 

121 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

122 

123 return remote_execution_pb2.BatchReadBlobsResponse() 

124 

125 @authorize(AuthContext) 

126 @DurationMetric(CAS_GET_TREE_TIME_METRIC_NAME, MetricRecordDomain.CAS) 

127 def GetTree(self, request, context): 

128 self.__logger.debug(f"GetTree request from [{context.peer()}] " 

129 f"([{printable_request_metadata(context)}])") 

130 

131 try: 

132 instance = self._get_instance(request.instance_name) 

133 yield from instance.get_tree(request) 

134 

135 except InvalidArgumentError as e: 

136 self.__logger.error(e) 

137 context.set_details(str(e)) 

138 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

139 

140 yield remote_execution_pb2.GetTreeResponse() 

141 

142 # --- Private API --- 

143 

144 def _get_instance(self, instance_name): 

145 try: 

146 return self._instances[instance_name] 

147 

148 except KeyError: 

149 raise InvalidArgumentError(f"Invalid instance name: [{instance_name}]") 

150 

151 

152class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): 

153 

154 def __init__(self, server): 

155 self.__logger = logging.getLogger(__name__) 

156 

157 self._instances = {} 

158 

159 bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server) 

160 

161 # --- Public API --- 

162 

163 def add_instance(self, name, instance): 

164 self._instances[name] = instance 

165 

166 # --- Public API: Servicer --- 

167 

168 @authorize(AuthContext) 

169 @DurationMetric(CAS_BYTESTREAM_READ_TIME_METRIC_NAME, MetricRecordDomain.CAS) 

170 def Read(self, request, context): 

171 self.__logger.debug(f"Read request from [{context.peer()}]") 

172 

173 names = request.resource_name.split('/') 

174 

175 try: 

176 instance_name = '' 

177 # Format: "{instance_name}/blobs/{hash}/{size}": 

178 if len(names) < 3 or names[-3] != 'blobs': 

179 raise InvalidArgumentError( 

180 f"Invalid resource name: [{request.resource_name}]") 

181 

182 elif names[0] != 'blobs': 

183 index = names.index('blobs') 

184 instance_name = '/'.join(names[:index]) 

185 names = names[index:] 

186 

187 if len(names) < 3: 

188 raise InvalidArgumentError( 

189 f"Invalid resource name: [{request.resource_name}]") 

190 

191 hash_, size_bytes = names[1], names[2] 

192 

193 instance = self._get_instance(instance_name) 

194 

195 yield from instance.read(hash_, size_bytes, 

196 request.read_offset, request.read_limit) 

197 

198 except InvalidArgumentError as e: 

199 self.__logger.error(e) 

200 context.set_details(str(e)) 

201 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

202 yield bytestream_pb2.ReadResponse() 

203 

204 except NotFoundError as e: 

205 self.__logger.error(e) 

206 context.set_details(str(e)) 

207 context.set_code(grpc.StatusCode.NOT_FOUND) 

208 yield bytestream_pb2.ReadResponse() 

209 

210 except OutOfRangeError as e: 

211 self.__logger.error(e) 

212 context.set_details(str(e)) 

213 context.set_code(grpc.StatusCode.OUT_OF_RANGE) 

214 yield bytestream_pb2.ReadResponse() 

215 

216 @authorize(AuthContext) 

217 @DurationMetric(CAS_BYTESTREAM_WRITE_TIME_METRIC_NAME, MetricRecordDomain.CAS) 

218 def Write(self, requests, context): 

219 self.__logger.debug(f"Write request from [{context.peer()}]") 

220 

221 request = next(requests) 

222 names = request.resource_name.split('/') 

223 

224 try: 

225 instance_name = '' 

226 # Format: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}/{anything}": 

227 if len(names) < 5 or 'uploads' not in names or 'blobs' not in names: 

228 raise InvalidArgumentError(f"Invalid resource name: [{request.resource_name}]") 

229 

230 elif names[0] != 'uploads': 

231 index = names.index('uploads') 

232 instance_name = '/'.join(names[:index]) 

233 names = names[index:] 

234 

235 if len(names) < 5: 

236 raise InvalidArgumentError(f"Invalid resource name: [{request.resource_name}]") 

237 

238 _, hash_, size_bytes = names[1], names[3], names[4] 

239 

240 instance = self._get_instance(instance_name) 

241 

242 return instance.write(hash_, size_bytes, request.data, 

243 [request.data for request in requests]) 

244 

245 except NotImplementedError as e: 

246 self.__logger.error(e) 

247 context.set_details(str(e)) 

248 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

249 

250 except InvalidArgumentError as e: 

251 self.__logger.error(e) 

252 context.set_details(str(e)) 

253 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

254 

255 except NotFoundError as e: 

256 self.__logger.error(e) 

257 context.set_details(str(e)) 

258 context.set_code(grpc.StatusCode.NOT_FOUND) 

259 

260 except PermissionDeniedError as e: 

261 self.__logger.error(e) 

262 context.set_details(str(e)) 

263 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

264 

265 return bytestream_pb2.WriteResponse() 

266 

267 @authorize(AuthContext) 

268 def QueryWriteStatus(self, request, context): 

269 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

270 context.set_details('Method not implemented!') 

271 

272 return bytestream_pb2.QueryWriteStatusResponse() 

273 

274 # --- Private API --- 

275 

276 def _get_instance(self, instance_name): 

277 try: 

278 return self._instances[instance_name] 

279 

280 except KeyError: 

281 raise InvalidArgumentError(f"Invalid instance name: [{instance_name}]")