Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/utils/decorators.py: 57.95%

195 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2023 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import itertools 

17import logging 

18import uuid 

19from functools import wraps 

20from typing import Any, Callable, Iterator, Type, TypeVar, cast 

21 

22import grpc 

23from google.protobuf.message import Message 

24 

25from buildgrid._exceptions import ( 

26 BotSessionCancelledError, 

27 BotSessionClosedError, 

28 BotSessionMismatchError, 

29 DuplicateBotSessionError, 

30 FailedPreconditionError, 

31 InvalidArgumentError, 

32 NotFoundError, 

33 PermissionDeniedError, 

34 RetriableError, 

35 StorageFullError, 

36 UnknownBotSessionError, 

37) 

38from buildgrid.server.context import ctx_grpc_request_id 

39 

40_Res = TypeVar("_Res") 

41_Self = TypeVar("_Self") 

42_Req = TypeVar("_Req") 

43_Ctx = TypeVar("_Ctx") 

44 

45 

46LOGGER = logging.getLogger(__name__) 

47 

48 

49def track_request_id(f: Callable[[_Self, _Req, _Ctx], _Res]) -> Callable[[_Self, _Req, _Ctx], _Res]: 

50 """Decorator to set the request ID ContextVar. 

51 

52 This decorator sets the ``ctx_grpc_request_id`` ContextVar to a UUID 

53 for the duration of the decorated function. This ContextVar is used 

54 in logging output to allow log lines for the same request to be 

55 identified. 

56 

57 """ 

58 

59 @wraps(f) 

60 def wrapper(*args: Any, **kwargs: Any) -> _Res: 

61 ctx_grpc_request_id.set(str(uuid.uuid4())) 

62 try: 

63 return f(*args, **kwargs) 

64 finally: 

65 ctx_grpc_request_id.set(None) 

66 

67 return wrapper 

68 

69 

70def track_request_id_generator( 

71 f: Callable[[_Self, _Req, _Ctx], Iterator[_Res]] 

72) -> Callable[[_Self, _Req, _Ctx], Iterator[_Res]]: 

73 """Decorator to set the request ID ContextVar. 

74 

75 This is similar to ``track_request_id``, except aimed at wrapping 

76 generator functions. 

77 

78 """ 

79 

80 @wraps(f) 

81 def wrapper(*args: Any, **kwargs: Any) -> Iterator[_Res]: 

82 ctx_grpc_request_id.set(str(uuid.uuid4())) 

83 try: 

84 yield from f(*args, **kwargs) 

85 finally: 

86 ctx_grpc_request_id.set(None) 

87 

88 return wrapper 

89 

90 

91Func = TypeVar("Func", bound=Callable) # type: ignore[type-arg] 

92 

93 

94def handle_errors_unary_unary( 

95 fallback_return_type: Type[Message], get_printable_request: Callable[[Any], Any] = lambda r: str(r) 

96) -> Callable[[Func], Func]: 

97 def decorator(f: Func) -> Func: 

98 @wraps(f) 

99 def wrapper(self: _Self, request: _Req, context: grpc.ServicerContext) -> Any: 

100 try: 

101 return f(self, request, context) 

102 

103 except BotSessionCancelledError as e: 

104 LOGGER.info(e) 

105 context.set_details(str(e)) 

106 context.set_code(grpc.StatusCode.CANCELLED) 

107 

108 except BotSessionClosedError as e: 

109 LOGGER.debug(e) 

110 context.set_details(str(e)) 

111 context.set_code(grpc.StatusCode.DATA_LOSS) 

112 

113 except ConnectionError as e: 

114 LOGGER.exception(e) 

115 context.set_details(str(e)) 

116 context.set_code(grpc.StatusCode.UNAVAILABLE) 

117 

118 except DuplicateBotSessionError as e: 

119 LOGGER.info(e) 

120 context.set_details(str(e)) 

121 context.set_code(grpc.StatusCode.ABORTED) 

122 

123 except FailedPreconditionError as e: 

124 LOGGER.error(e) 

125 context.set_details(str(e)) 

126 context.set_code(grpc.StatusCode.FAILED_PRECONDITION) 

127 

128 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

129 LOGGER.info(e) 

130 context.set_details(str(e)) 

131 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

132 

133 except NotFoundError as e: 

134 LOGGER.debug(e) 

135 context.set_details(str(e)) 

136 context.set_code(grpc.StatusCode.NOT_FOUND) 

137 

138 except NotImplementedError as e: 

139 LOGGER.info(e) 

140 context.set_details(str(e)) 

141 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

142 

143 except PermissionDeniedError as e: 

144 LOGGER.exception(e) 

145 context.set_details(str(e)) 

146 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

147 

148 except RetriableError as e: 

149 LOGGER.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

150 context.abort_with_status(e.error_status) 

151 

152 except StorageFullError as e: 

153 LOGGER.exception(e) 

154 context.set_details(str(e)) 

155 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 

156 

157 except Exception as e: 

158 LOGGER.exception(f"Unexpected error in {f.__name__}; request=[{get_printable_request(request)}]") 

159 context.set_details(str(e)) 

160 context.set_code(grpc.StatusCode.INTERNAL) 

161 

162 return fallback_return_type() 

163 

164 return cast(Func, wrapper) 

165 

166 return decorator 

167 

168 

169def handle_errors_stream_unary( 

170 fallback_return_type: Type[Message], get_printable_request: Callable[[Any], Any] = lambda r: str(r) 

171) -> Callable[[Func], Func]: 

172 def decorator(f: Func) -> Func: 

173 @wraps(f) 

174 def wrapper(self: _Self, request: Iterator[_Req], context: grpc.ServicerContext) -> Any: 

175 try: 

176 initial_request = next(request) 

177 return f(self, itertools.chain([initial_request], request), context) 

178 

179 except BotSessionCancelledError as e: 

180 LOGGER.info(e) 

181 context.set_details(str(e)) 

182 context.set_code(grpc.StatusCode.CANCELLED) 

183 

184 except BotSessionClosedError as e: 

185 LOGGER.debug(e) 

186 context.set_details(str(e)) 

187 context.set_code(grpc.StatusCode.DATA_LOSS) 

188 

189 except ConnectionError as e: 

190 LOGGER.exception(e) 

191 context.set_details(str(e)) 

192 context.set_code(grpc.StatusCode.UNAVAILABLE) 

193 

194 except DuplicateBotSessionError as e: 

195 LOGGER.info(e) 

196 context.set_details(str(e)) 

197 context.set_code(grpc.StatusCode.ABORTED) 

198 

199 except FailedPreconditionError as e: 

200 LOGGER.error(e) 

201 context.set_details(str(e)) 

202 context.set_code(grpc.StatusCode.FAILED_PRECONDITION) 

203 

204 except (InvalidArgumentError, BotSessionMismatchError, UnknownBotSessionError) as e: 

205 LOGGER.info(e) 

206 context.set_details(str(e)) 

207 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

208 

209 except NotFoundError as e: 

210 LOGGER.debug(e) 

211 context.set_details(str(e)) 

212 context.set_code(grpc.StatusCode.NOT_FOUND) 

213 

214 except NotImplementedError as e: 

215 LOGGER.info(e) 

216 context.set_details(str(e)) 

217 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

218 

219 except PermissionDeniedError as e: 

220 LOGGER.exception(e) 

221 context.set_details(str(e)) 

222 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

223 

224 except RetriableError as e: 

225 LOGGER.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

226 context.abort_with_status(e.error_status) 

227 

228 except StorageFullError as e: 

229 LOGGER.exception(e) 

230 context.set_details(str(e)) 

231 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 

232 

233 except Exception as e: 

234 LOGGER.exception( 

235 f"Unexpected error in {f.__name__}; request=[{get_printable_request(initial_request)}]" 

236 ) 

237 context.set_details(str(e)) 

238 context.set_code(grpc.StatusCode.INTERNAL) 

239 

240 return fallback_return_type() 

241 

242 return cast(Func, wrapper) 

243 

244 return decorator 

245 

246 

247def handle_errors_unary_stream( 

248 fallback_return_type: Type[Message], 

249) -> Callable[[Func], Func]: 

250 def decorator(f: Func) -> Func: 

251 @wraps(f) 

252 def wrapper(self: _Self, request: _Req, context: grpc.ServicerContext) -> Iterator[Any]: 

253 try: 

254 yield from f(self, request, context) 

255 

256 except ConnectionError as e: 

257 LOGGER.exception(e) 

258 context.set_details(str(e)) 

259 context.set_code(grpc.StatusCode.UNAVAILABLE) 

260 yield fallback_return_type() 

261 

262 except FailedPreconditionError as e: 

263 LOGGER.error(e) 

264 context.set_details(str(e)) 

265 context.set_code(grpc.StatusCode.FAILED_PRECONDITION) 

266 yield fallback_return_type() 

267 

268 except InvalidArgumentError as e: 

269 LOGGER.info(e) 

270 context.set_details(str(e)) 

271 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 

272 yield fallback_return_type() 

273 

274 except NotFoundError as e: 

275 LOGGER.debug(e) 

276 context.set_details(str(e)) 

277 context.set_code(grpc.StatusCode.NOT_FOUND) 

278 yield fallback_return_type() 

279 

280 except NotImplementedError as e: 

281 LOGGER.info(e) 

282 context.set_details(str(e)) 

283 context.set_code(grpc.StatusCode.UNIMPLEMENTED) 

284 yield fallback_return_type() 

285 

286 except PermissionDeniedError as e: 

287 LOGGER.exception(e) 

288 context.set_details(str(e)) 

289 context.set_code(grpc.StatusCode.PERMISSION_DENIED) 

290 yield fallback_return_type() 

291 

292 except RetriableError as e: 

293 LOGGER.info(f"Retriable error, client should retry in: {e.retry_info.retry_delay}") 

294 context.abort_with_status(e.error_status) 

295 

296 except StorageFullError as e: 

297 LOGGER.exception(e) 

298 context.set_details(str(e)) 

299 context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) 

300 yield fallback_return_type() 

301 

302 except Exception as e: 

303 LOGGER.exception(f"Unexpected error in {f.__name__}; request=[{request}]") 

304 context.set_details(str(e)) 

305 context.set_code(grpc.StatusCode.INTERNAL) 

306 yield fallback_return_type() 

307 

308 return cast(Func, wrapper) 

309 

310 return decorator