Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/s3_cache.py: 90.00%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17S3 Action Cache 

18================== 

19 

20Implements an Action Cache using S3 to store cache entries. 

21 

22""" 

23 

24import io 

25import logging 

26from typing import Any, Dict, Optional 

27 

28import boto3 

29from botocore.config import Config as BotoConfig 

30from botocore.exceptions import ClientError 

31 

32from buildgrid._exceptions import ( 

33 NotFoundError, 

34 StorageFullError 

35) 

36from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ( 

37 ActionResult, 

38 Digest, 

39) 

40from buildgrid.server.actioncache.caches.action_cache_abc import ActionCacheABC 

41from buildgrid.server.cas.storage.storage_abc import StorageABC 

42from buildgrid.server.s3 import s3utils 

43from buildgrid._enums import ActionCacheEntryType 

44 

45 

46class S3ActionCache(ActionCacheABC): 

47 def __init__(self, storage: StorageABC, allow_updates: bool=True, 

48 cache_failed_actions: bool=True, 

49 entry_type: Optional[ActionCacheEntryType]=ActionCacheEntryType.ACTION_RESULT_DIGEST, 

50 migrate_entries: Optional[bool]=False, 

51 bucket: Optional[str]=None, 

52 endpoint: Optional[str]=None, access_key: Optional[str]=None, 

53 secret_key: Optional[str]=None, 

54 config: Optional[BotoConfig]=BotoConfig()): 

55 """ Initialises a new ActionCache instance using S3 to persist the action cache. 

56 

57 Args: 

58 storage (StorageABC): storage backend instance to be used to store ActionResults. 

59 allow_updates (bool): allow the client to write to storage 

60 cache_failed_actions (bool): whether to store failed actions in the Action Cache 

61 entry_type (ActionCacheEntryType): whether to store ActionResults or their digests. 

62 migrate_entries (bool): if set, migrate entries that contain a value with 

63 a different `EntryType` to `entry_type` as they are accessed 

64 (False by default). 

65 

66 bucket (str): Name of bucket 

67 endpoint (str): URL of endpoint. 

68 access-key (str): S3-ACCESS-KEY 

69 secret-key (str): S3-SECRET-KEY 

70 """ 

71 ActionCacheABC.__init__(self, storage=storage, allow_updates=allow_updates) 

72 self._logger = logging.getLogger(__name__) 

73 

74 self._entry_type = entry_type 

75 self._migrate_entries = migrate_entries 

76 

77 self._cache_failed_actions = cache_failed_actions 

78 assert bucket is not None 

79 self._bucket = bucket 

80 

81 # Boto logs can be very verbose, restrict to WARNING 

82 for boto_logger_name in [ 

83 'boto3', 'botocore', 

84 's3transfer', 'urllib3' 

85 ]: 

86 boto_logger = logging.getLogger(boto_logger_name) 

87 boto_logger.setLevel(max(boto_logger.level, logging.WARNING)) 

88 

89 # Only pass arguments with a value to support testing with moto_server 

90 client_kwargs = {} # type: Dict[str, Any] 

91 if endpoint is not None: 

92 client_kwargs["endpoint_url"] = endpoint 

93 if access_key is not None: 

94 client_kwargs["aws_access_key_id"] = access_key 

95 if secret_key is not None: 

96 client_kwargs["aws_secret_access_key"] = secret_key 

97 client_kwargs["config"] = config 

98 

99 self._s3cache = boto3.client('s3', **client_kwargs) 

100 

101 # --- Public API --- 

102 @property 

103 def allow_updates(self) -> bool: 

104 return self._allow_updates 

105 

106 def get_action_result(self, action_digest: Digest) -> ActionResult: 

107 """Retrieves the cached ActionResult for the given Action digest. 

108 

109 Args: 

110 action_digest: The digest to get the result for 

111 

112 Returns: 

113 The cached ActionResult matching the given key or raises 

114 NotFoundError. 

115 """ 

116 action_result = self._get_action_result(action_digest) 

117 if action_result is not None and self._action_result_blobs_still_exist(action_result): 

118 return action_result 

119 

120 if self._allow_updates: 

121 self._logger.debug(f"Removing {action_digest.hash}/{action_digest.size_bytes} " 

122 "from cache due to missing blobs in CAS") 

123 self._delete_key_from_cache(action_digest) 

124 

125 raise NotFoundError(f"Key not found: {action_digest.hash}/{action_digest.size_bytes}") 

126 

127 def update_action_result(self, action_digest: Digest, 

128 action_result: ActionResult) -> None: 

129 """Stores the result in cache for the given key. 

130 

131 Args: 

132 action_digest (Digest): digest of Action to update 

133 action_result (ActionResult): ActionResult to store. 

134 """ 

135 if not self._allow_updates: 

136 raise NotImplementedError("Updating cache not allowed") 

137 

138 if self._cache_failed_actions or action_result.exit_code == 0: 

139 action_result_digest = self._storage.put_message(action_result) 

140 

141 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST: 

142 self._update_cache_key(action_digest, action_result_digest.SerializeToString()) 

143 else: 

144 self._update_cache_key(action_digest, action_result.SerializeToString()) 

145 

146 self._logger.info( 

147 f"Result cached for action [{action_digest.hash}/{action_digest.size_bytes}]") 

148 

149 # --- Private API --- 

150 def _get_action_result(self, digest: Digest) -> Optional[ActionResult]: 

151 """Get an `ActionResult` from the cache. 

152 

153 If present, returns the `ActionResult` corresponding to the given digest. 

154 Otherwise returns None. 

155 

156 When configured to do so, if the value stored in the entry in S3 contains 

157 a different type, it will convert it to `entry_type`. 

158 

159 Args: 

160 digest: Action digest to get the associated ActionResult digest for 

161 

162 Returns: 

163 The `ActionResult` or None if the digest is not present 

164 """ 

165 value_in_cache = self._get_value_from_cache(digest) 

166 if not value_in_cache: 

167 return None 

168 

169 # Attempting to parse the entry as a `Digest` first: 

170 action_result_digest = Digest.FromString(value_in_cache) 

171 if len(action_result_digest.hash) == len(digest.hash): 

172 # The cache contains the `Digest` of the `ActionResult`: 

173 action_result = self._storage.get_message(action_result_digest, 

174 ActionResult) 

175 

176 # If configured, update the entry to contain an `ActionResult`: 

177 if self._entry_type == ActionCacheEntryType.ACTION_RESULT and self._migrate_entries: 

178 self._logger.debug(f"Converting entry for {digest.hash}/{digest.size_bytes} " 

179 "from Digest to ActionResult") 

180 self._update_cache_key(digest, action_result.SerializeToString()) 

181 

182 else: 

183 action_result = ActionResult.FromString(value_in_cache) 

184 

185 # If configured, update the entry to contain a `Digest`: 

186 if self._entry_type == ActionCacheEntryType.ACTION_RESULT_DIGEST and self._migrate_entries: 

187 self._logger.debug(f"Converting entry for {digest.hash}/{digest.size_bytes} " 

188 "from ActionResult to Digest") 

189 action_result_digest = self._storage.put_message(action_result) 

190 self._update_cache_key(digest, action_result_digest.SerializeToString()) 

191 

192 return action_result 

193 

194 def _get_value_from_cache(self, digest: Digest) -> Optional[bytes]: 

195 """Get the bytes stored in cache for the given Digest. 

196 

197 Args: 

198 digest: Action digest to get the associated bytes in S3 

199 

200 Returns: 

201 bytes or None if the digest is not present 

202 """ 

203 

204 try: 

205 s3object = self._get_s3object(digest) 

206 s3object.fileobj = io.BytesIO() 

207 s3utils.get_object(self._s3cache, s3object) 

208 s3object.fileobj.seek(0) 

209 return s3object.fileobj.read() 

210 except ClientError as e: 

211 if e.response['Error']['Code'] not in ['404', 'NoSuchKey']: 

212 raise 

213 return None 

214 

215 def _update_cache_key(self, digest: Digest, value: bytes) -> None: 

216 try: 

217 s3object = self._get_s3object(digest) 

218 s3object.fileobj = io.BytesIO(value) 

219 s3object.filesize = len(value) 

220 s3utils.put_object(self._s3cache, s3object) 

221 except ClientError as error: 

222 if error.response['Error']['Code'] == 'QuotaExceededException': 

223 raise StorageFullError("ActionCache S3 Quota Exceeded.") from error 

224 raise error 

225 

226 def _delete_key_from_cache(self, digest: Digest) -> None: 

227 """Remove an entry from the ActionCache 

228 

229 Args: 

230 digest: entry to remove from the ActionCache 

231 

232 Returns: 

233 None 

234 """ 

235 if not self._allow_updates: 

236 raise NotImplementedError("Updating cache not allowed") 

237 

238 self._s3cache.delete_object(Bucket=self._bucket, Key=self._get_key(digest)) 

239 

240 @staticmethod 

241 def _get_key(digest: Digest) -> str: 

242 """ 

243 Given a `Digest`, returns the key used to store its 

244 corresponding entry in S3. 

245 """ 

246 return f'{digest.hash}_{digest.size_bytes}' 

247 

248 def _get_s3object(self, digest: Digest): 

249 return s3utils.S3Object(self._bucket, self._get_key(digest))