Coverage for /builds/BuildGrid/buildgrid/buildgrid/cleanup/cleanup.py: 39.37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

127 statements  

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import asyncio 

17import logging 

18import signal 

19from datetime import datetime 

20import time 

21 

22from buildgrid.server.cas.storage.with_cache import WithCacheStorage 

23from buildgrid.server.monitoring import ( 

24 get_monitoring_bus, 

25 setup_monitoring_bus 

26) 

27from buildgrid.server.metrics_names import ( 

28 CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, 

29 CLEANUP_BYTES_DELETION_RATE_METRIC_NAME, 

30 CLEANUP_INDEX_BULK_DELETE_METRIC_NAME, 

31 CLEANUP_INDEX_MARK_DELETED_METRIC_NAME, 

32 CLEANUP_RUNTIME_METRIC_NAME, 

33 CLEANUP_STORAGE_BULK_DELETE_METRIC_NAME, 

34 CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME 

35) 

36from buildgrid.server.metrics_utils import ( 

37 publish_counter_metric, 

38 publish_gauge_metric, 

39 DurationMetric 

40) 

41 

42 

43class CASCleanUp: 

44 """Creates a LRU CAS cleanup service.""" 

45 

46 def __init__(self, dry_run, high_watermark, low_watermark, sleep_interval, batch_size, 

47 only_if_unused_for, storages, indexes, monitor, mon_endpoint_type=None, 

48 mon_endpoint_location=None, mon_serialisation_format=None, mon_metric_prefix=None): 

49 

50 self._logger = logging.getLogger(__name__) 

51 

52 self._dry_run = dry_run 

53 

54 self._high_watermark = high_watermark 

55 self._low_watermark = low_watermark 

56 self._batch_size = batch_size 

57 self._only_if_unused_for = only_if_unused_for 

58 

59 self._storages = storages 

60 self._indexes = indexes 

61 

62 self._is_instrumented = monitor 

63 

64 self._sleep_interval = sleep_interval 

65 

66 self._main_loop = asyncio.get_event_loop() 

67 

68 if self._is_instrumented: 

69 setup_monitoring_bus( 

70 endpoint_type=mon_endpoint_type, 

71 endpoint_location=mon_endpoint_location, 

72 metric_prefix=mon_metric_prefix, 

73 serialisation_format=mon_serialisation_format) 

74 

75 # --- Public API --- 

76 

77 def start(self, *, on_server_start_cb=None): 

78 """ Start cleanup service """ 

79 self._main_loop.add_signal_handler(signal.SIGTERM, self.stop) 

80 if self._is_instrumented: 

81 monitoring_bus = get_monitoring_bus() 

82 monitoring_bus.start() 

83 

84 # Retry on errors 

85 attempts = 0 

86 while True: 

87 tasks = [] 

88 for instance_name in self._storages: 

89 if self._storages[instance_name].is_cleanup_enabled(): 

90 if self._dry_run: 

91 self._calculate_cleanup(instance_name) 

92 else: 

93 tasks.append(self._cleanupWorker(instance_name)) 

94 else: 

95 self._logger.info(f"CleanUp for instance '{instance_name}' skipped.") 

96 if not self._dry_run: 

97 fut = asyncio.gather(*tasks, loop=self._main_loop) 

98 try: 

99 self._main_loop.run_until_complete(fut) 

100 except Exception as e: 

101 self._logger.exception(e) 

102 sleep_time = 1.6 ** attempts 

103 # Exponential backoff before retrying 

104 self._logger.info(f"Retrying Cleanup in {sleep_time} seconds") 

105 time.sleep(sleep_time) 

106 attempts += 1 

107 continue 

108 # Only retry on errors 

109 break 

110 

111 def stop(self): 

112 """ Stops the cleanup service """ 

113 self._logger.info("Stopping Cleanup Service") 

114 if self._is_instrumented: 

115 monitoring_bus = get_monitoring_bus() 

116 monitoring_bus.stop() 

117 if not self._dry_run: 

118 self._main_loop.stop() 

119 

120 # --- Private API --- 

121 

122 def _calculate_cleanup(self, instance_name): 

123 """Work out which blobs will be deleted by the cleanup command. 

124 

125 Args: 

126 instance_name (str): The instance to do a cleanup dry-run for. 

127 

128 """ 

129 self._logger.info(f"Cleanup dry run for instance '{instance_name}'") 

130 index = self._indexes[instance_name] 

131 only_delete_before = self._get_last_accessed_threshold() 

132 total_size = index.get_total_size() 

133 self._logger.info( 

134 f"CAS size is {total_size} bytes, compared with a high water mark of " 

135 f"{self._high_watermark} bytes and a low water mark of {self._low_watermark} bytes.") 

136 if total_size >= self._high_watermark: 

137 required_space = total_size - self._low_watermark 

138 digests = index.mark_n_bytes_as_deleted(required_space, self._dry_run, 

139 protect_blobs_after=only_delete_before) 

140 cleared_space = sum(digest.size_bytes for digest in digests) 

141 self._logger.info( 

142 f"{len(digests)} digests will be deleted, freeing up {cleared_space} bytes.") 

143 else: 

144 self._logger.info( 

145 f"Total size {total_size} is less than the high water mark, " 

146 f"nothing will be deleted.") 

147 

148 def _do_cleanup_batch(self, instance_name, index, storage, only_delete_before, total_size): 

149 batch_start_time = time.time() 

150 

151 # Mark a batch of index entries for deletion 

152 with DurationMetric(CLEANUP_INDEX_MARK_DELETED_METRIC_NAME, 

153 instance_name, 

154 instanced=True): 

155 digests = index.mark_n_bytes_as_deleted(self._batch_size, 

156 protect_blobs_after=only_delete_before) 

157 

158 if not digests and total_size >= self._high_watermark: 

159 self._logger.error(f"Marked 0 digests for deletion, even though cleanup was triggered. " 

160 f"This may be because the remaining digests have been accessed within " 

161 f"{only_delete_before}. Total size still remains greater than high watermark!") 

162 elif not digests: 

163 self._logger.warning(f"Marked 0 digests for deletion, even though cleanup was triggered. " 

164 f"This may be because the remaining digests have been accessed within " 

165 f"{only_delete_before}.") 

166 else: 

167 self._logger.info(f"Marked {len(digests)} digests for deletion in the index") 

168 

169 # Bulk delete the marked blobs from the actual storage backend 

170 with DurationMetric(CLEANUP_STORAGE_BULK_DELETE_METRIC_NAME, 

171 instance_name, 

172 instanced=True): 

173 failed_deletions = storage.bulk_delete(digests) 

174 self._logger.info(f"Requested bulk deletion of {len(digests)} digests from storage") 

175 

176 if failed_deletions: 

177 # Separately handle blobs which failed to be deleted and blobs that 

178 # were already missing from the storage 

179 if self._is_instrumented: 

180 publish_counter_metric( 

181 CLEANUP_STORAGE_DELETION_FAILURES_METRIC_NAME, 

182 len(failed_deletions), 

183 {"instance-name": instance_name} 

184 ) 

185 

186 self._logger.info(f"Failed to delete {len(failed_deletions)} blobs.") 

187 for failure in failed_deletions: 

188 self._logger.debug(f"Failed to delete {failure}.") 

189 

190 digests_to_delete = [ 

191 digest for digest in digests 

192 if digest.hash not in failed_deletions 

193 ] 

194 else: 

195 if failed_deletions is None: 

196 self._logger.error("Calling bulk_delete on storage returned 'None' instead of a list") 

197 digests_to_delete = digests 

198 

199 # Bulk delete the entries for the successfully deleted (and already missing) 

200 # blobs from the storage 

201 with DurationMetric(CLEANUP_INDEX_BULK_DELETE_METRIC_NAME, 

202 instance_name, 

203 instanced=True): 

204 index_failed_deletions = index.bulk_delete(digests_to_delete) 

205 self._logger.info(f"Bulk deleted {len(digests_to_delete)} digests from the index") 

206 

207 if index_failed_deletions is None: 

208 self._logger.error("Calling bulk_delete on the index returned 'None' instead of a list") 

209 

210 if self._is_instrumented: 

211 batch_duration = time.time() - batch_start_time 

212 blobs_deleted_per_second = len(digests_to_delete) / batch_duration 

213 publish_gauge_metric( 

214 CLEANUP_BLOBS_DELETION_RATE_METRIC_NAME, 

215 blobs_deleted_per_second, 

216 {"instance-name": instance_name} 

217 ) 

218 

219 bytes_deleted = sum(digest.size_bytes for digest in digests_to_delete) 

220 bytes_deleted_per_second = bytes_deleted / batch_duration 

221 publish_gauge_metric( 

222 CLEANUP_BYTES_DELETION_RATE_METRIC_NAME, 

223 bytes_deleted_per_second, 

224 {"instance-name": instance_name} 

225 ) 

226 

227 async def _cleanupWorker(self, instance_name): 

228 """ Cleanup when full """ 

229 storage = self._storages[instance_name] 

230 if isinstance(storage, WithCacheStorage): 

231 self._logger.warning("Cleaning up a WithCache storage will not cleanup local cache entries " 

232 "running with a total cache size larger than the configured low watermark " 

233 "may result in inconsistent cache state.") 

234 index = self._indexes[instance_name] 

235 index.set_instance_name(instance_name) 

236 storage.set_instance_name(instance_name) 

237 self._logger.info(f"Cleanup for instance '{instance_name}' started.") 

238 while True: 

239 total_size = index.get_total_size() 

240 if total_size >= self._high_watermark: 

241 to_delete = total_size - self._low_watermark 

242 self._logger.info( 

243 f"CAS size for instance '{instance_name}' is {total_size} bytes, at least " 

244 f"{to_delete} bytes will be cleared.") 

245 self._logger.info(f"Deleting items from storage/index for instance '{instance_name}'.") 

246 

247 with DurationMetric(CLEANUP_RUNTIME_METRIC_NAME, 

248 instance_name, 

249 instanced=True): 

250 while total_size > self._low_watermark: 

251 only_delete_before = self._get_last_accessed_threshold() 

252 await self._main_loop.run_in_executor(None, self._do_cleanup_batch, 

253 instance_name, index, 

254 storage, only_delete_before, 

255 total_size) 

256 total_size = index.get_total_size() 

257 self._logger.info(f"After cleanup batch total CAS size is {total_size} bytes.") 

258 

259 self._logger.info(f"Finished cleanup. CAS size is now {total_size} bytes.") 

260 await asyncio.sleep(self._sleep_interval) 

261 

262 def _get_last_accessed_threshold(self): 

263 return datetime.utcnow() - self._only_if_unused_for