Coverage for /builds/BuildGrid/buildgrid/buildgrid/cleanup/cleanup.py: 39.85%

133 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import asyncio 

17import logging 

18import signal 

19from datetime import datetime 

20import time 

21 

22from buildgrid.server.cas.storage.with_cache import WithCacheStorage 

23from buildgrid.server.monitoring import ( 

24 get_monitoring_bus, 

25 setup_monitoring_bus 

26) 

27from buildgrid.server.metrics_names import ( 

28 CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, 

29 CLEANUP_BYTES_DELETION_RATE_METRIC_NAME, 

30 CLEANUP_INDEX_BULK_DELETE_METRIC_NAME, 

31 CLEANUP_INDEX_MARK_DELETED_METRIC_NAME, 

32 CLEANUP_RUNTIME_METRIC_NAME, 

33 CLEANUP_STORAGE_BULK_DELETE_METRIC_NAME, 

34 CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME 

35) 

36from buildgrid.server.metrics_utils import ( 

37 publish_counter_metric, 

38 publish_gauge_metric, 

39 DurationMetric 

40) 

41 

42 

43class CASCleanUp: 

44 """Creates a LRU CAS cleanup service.""" 

45 

46 def __init__(self, dry_run, high_watermark, low_watermark, sleep_interval, batch_size, 

47 only_if_unused_for, storages, indexes, monitor, mon_endpoint_type=None, 

48 mon_endpoint_location=None, mon_serialisation_format=None, mon_metric_prefix=None): 

49 

50 self._logger = logging.getLogger(__name__) 

51 

52 self._dry_run = dry_run 

53 

54 self._high_watermark = high_watermark 

55 self._low_watermark = low_watermark 

56 self._batch_size = batch_size 

57 self._only_if_unused_for = only_if_unused_for 

58 

59 self._storages = storages 

60 self._indexes = indexes 

61 

62 self._is_instrumented = monitor 

63 

64 self._sleep_interval = sleep_interval 

65 

66 self._main_loop = asyncio.get_event_loop() 

67 

68 if self._is_instrumented: 

69 setup_monitoring_bus( 

70 endpoint_type=mon_endpoint_type, 

71 endpoint_location=mon_endpoint_location, 

72 metric_prefix=mon_metric_prefix, 

73 serialisation_format=mon_serialisation_format) 

74 

75 # --- Public API --- 

76 

77 def start(self, *, on_server_start_cb=None): 

78 """ Start cleanup service """ 

79 self._main_loop.add_signal_handler(signal.SIGTERM, self.stop) 

80 if self._is_instrumented: 

81 monitoring_bus = get_monitoring_bus() 

82 monitoring_bus.start() 

83 

84 # Retry on errors 

85 attempts = 0 

86 while True: 

87 tasks = [] 

88 for instance_name in self._storages: 

89 if self._storages[instance_name].is_cleanup_enabled(): 

90 if self._dry_run: 

91 self._calculate_cleanup(instance_name) 

92 else: 

93 tasks.append(self._cleanupWorker(instance_name)) 

94 else: 

95 self._logger.info(f"CleanUp for instance '{instance_name}' skipped.") 

96 if not self._dry_run: 

97 fut = self._gather_tasks(*tasks) 

98 try: 

99 self._main_loop.run_until_complete(fut) 

100 except asyncio.CancelledError: 

101 self._logger.info("Cleanup task cancelled, exiting") 

102 except Exception as e: 

103 self._logger.exception(e) 

104 sleep_time = 1.6 ** attempts 

105 # Exponential backoff before retrying 

106 self._logger.info(f"Retrying Cleanup in {sleep_time} seconds") 

107 time.sleep(sleep_time) 

108 attempts += 1 

109 continue 

110 # Only retry on errors 

111 break 

112 

113 def stop(self): 

114 """ Stops the cleanup service """ 

115 self._logger.info("Stopping Cleanup Service") 

116 if self._is_instrumented: 

117 monitoring_bus = get_monitoring_bus() 

118 monitoring_bus.stop() 

119 if not self._dry_run: 

120 tasks = [t for t in asyncio.all_tasks(self._main_loop) if t is not asyncio.current_task(self._main_loop)] 

121 for task in tasks: 

122 task.cancel() 

123 

124 # --- Private API --- 

125 

126 async def _gather_tasks(self, *tasks): 

127 await asyncio.gather(*tasks) 

128 

129 def _calculate_cleanup(self, instance_name): 

130 """Work out which blobs will be deleted by the cleanup command. 

131 

132 Args: 

133 instance_name (str): The instance to do a cleanup dry-run for. 

134 

135 """ 

136 self._logger.info(f"Cleanup dry run for instance '{instance_name}'") 

137 index = self._indexes[instance_name] 

138 only_delete_before = self._get_last_accessed_threshold() 

139 total_size = index.get_total_size() 

140 self._logger.info( 

141 f"CAS size is {total_size} bytes, compared with a high water mark of " 

142 f"{self._high_watermark} bytes and a low water mark of {self._low_watermark} bytes.") 

143 if total_size >= self._high_watermark: 

144 required_space = total_size - self._low_watermark 

145 digests = index.mark_n_bytes_as_deleted(required_space, self._dry_run, 

146 protect_blobs_after=only_delete_before) 

147 cleared_space = sum(digest.size_bytes for digest in digests) 

148 self._logger.info( 

149 f"{len(digests)} digests will be deleted, freeing up {cleared_space} bytes.") 

150 else: 

151 self._logger.info( 

152 f"Total size {total_size} is less than the high water mark, " 

153 f"nothing will be deleted.") 

154 

155 def _do_cleanup_batch(self, instance_name, index, storage, only_delete_before, total_size): 

156 batch_start_time = time.time() 

157 

158 # Mark a batch of index entries for deletion 

159 with DurationMetric(CLEANUP_INDEX_MARK_DELETED_METRIC_NAME, 

160 instance_name, 

161 instanced=True): 

162 digests = index.mark_n_bytes_as_deleted(self._batch_size, 

163 protect_blobs_after=only_delete_before) 

164 

165 if not digests and total_size >= self._high_watermark: 

166 self._logger.error(f"Marked 0 digests for deletion, even though cleanup was triggered. " 

167 f"This may be because the remaining digests have been accessed within " 

168 f"{only_delete_before}. Total size still remains greater than high watermark!") 

169 elif not digests: 

170 self._logger.warning(f"Marked 0 digests for deletion, even though cleanup was triggered. " 

171 f"This may be because the remaining digests have been accessed within " 

172 f"{only_delete_before}.") 

173 else: 

174 self._logger.info(f"Marked {len(digests)} digests for deletion in the index") 

175 

176 # Bulk delete the marked blobs from the actual storage backend 

177 with DurationMetric(CLEANUP_STORAGE_BULK_DELETE_METRIC_NAME, 

178 instance_name, 

179 instanced=True): 

180 failed_deletions = storage.bulk_delete(digests) 

181 self._logger.info(f"Requested bulk deletion of {len(digests)} digests from storage") 

182 

183 if failed_deletions: 

184 # Separately handle blobs which failed to be deleted and blobs that 

185 # were already missing from the storage 

186 if self._is_instrumented: 

187 publish_counter_metric( 

188 CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME, 

189 len(failed_deletions), 

190 {"instance-name": instance_name} 

191 ) 

192 

193 self._logger.info(f"Failed to delete {len(failed_deletions)} blobs.") 

194 for failure in failed_deletions: 

195 self._logger.debug(f"Failed to delete {failure}.") 

196 

197 digests_to_delete = [ 

198 digest for digest in digests 

199 if digest.hash not in failed_deletions 

200 ] 

201 else: 

202 if failed_deletions is None: 

203 self._logger.error("Calling bulk_delete on storage returned 'None' instead of a list") 

204 digests_to_delete = digests 

205 

206 # Bulk delete the entries for the successfully deleted (and already missing) 

207 # blobs from the storage 

208 with DurationMetric(CLEANUP_INDEX_BULK_DELETE_METRIC_NAME, 

209 instance_name, 

210 instanced=True): 

211 index_failed_deletions = index.bulk_delete(digests_to_delete) 

212 self._logger.info(f"Bulk deleted {len(digests_to_delete)} digests from the index") 

213 

214 if index_failed_deletions is None: 

215 self._logger.error("Calling bulk_delete on the index returned 'None' instead of a list") 

216 

217 if self._is_instrumented: 

218 batch_duration = time.time() - batch_start_time 

219 blobs_deleted_per_second = len(digests_to_delete) / batch_duration 

220 publish_gauge_metric( 

221 CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, 

222 blobs_deleted_per_second, 

223 {"instance-name": instance_name} 

224 ) 

225 

226 bytes_deleted = sum(digest.size_bytes for digest in digests_to_delete) 

227 bytes_deleted_per_second = bytes_deleted / batch_duration 

228 publish_gauge_metric( 

229 CLEANUP_BYTES_DELETION_RATE_METRIC_NAME, 

230 bytes_deleted_per_second, 

231 {"instance-name": instance_name} 

232 ) 

233 

234 async def _cleanupWorker(self, instance_name): 

235 """ Cleanup when full """ 

236 storage = self._storages[instance_name] 

237 if isinstance(storage, WithCacheStorage): 

238 self._logger.warning("Cleaning up a WithCache storage will not cleanup local cache entries " 

239 "running with a total cache size larger than the configured low watermark " 

240 "may result in inconsistent cache state.") 

241 index = self._indexes[instance_name] 

242 index.set_instance_name(instance_name) 

243 storage.set_instance_name(instance_name) 

244 self._logger.info(f"Cleanup for instance '{instance_name}' started.") 

245 while True: 

246 total_size = index.get_total_size() 

247 if total_size >= self._high_watermark: 

248 to_delete = total_size - self._low_watermark 

249 self._logger.info( 

250 f"CAS size for instance '{instance_name}' is {total_size} bytes, at least " 

251 f"{to_delete} bytes will be cleared.") 

252 self._logger.info(f"Deleting items from storage/index for instance '{instance_name}'.") 

253 

254 with DurationMetric(CLEANUP_RUNTIME_METRIC_NAME, 

255 instance_name, 

256 instanced=True): 

257 while total_size > self._low_watermark: 

258 only_delete_before = self._get_last_accessed_threshold() 

259 await self._main_loop.run_in_executor(None, self._do_cleanup_batch, 

260 instance_name, index, 

261 storage, only_delete_before, 

262 total_size) 

263 total_size = index.get_total_size() 

264 self._logger.info(f"After cleanup batch total CAS size is {total_size} bytes.") 

265 

266 self._logger.info(f"Finished cleanup. CAS size is now {total_size} bytes.") 

267 await asyncio.sleep(self._sleep_interval) 

268 

269 def _get_last_accessed_threshold(self): 

270 return datetime.utcnow() - self._only_if_unused_for