Coverage for /builds/BuildGrid/buildgrid/buildgrid/cleanup/cleanup.py: 71.93%

114 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17import threading 

18import time 

19from contextlib import ExitStack 

20from datetime import datetime, timedelta 

21from typing import Any, Dict, List, Optional 

22 

23from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

24from buildgrid.server.cas.storage.index.index_abc import IndexABC 

25from buildgrid.server.metrics_names import ( 

26 CLEANUP_BYTES_DELETED_METRIC_NAME, 

27 CLEANUP_BYTES_DELETION_RATE_METRIC_NAME, 

28 CLEANUP_INDEX_TOTAL_SIZE_BYTES_EXCLUDE_MARKED_METRIC_NAME, 

29 CLEANUP_INDEX_TOTAL_SIZE_BYTES_INCLUDE_MARKED_METRIC_NAME, 

30 CLEANUP_RUNTIME_METRIC_NAME, 

31) 

32from buildgrid.server.metrics_utils import DurationMetric, publish_counter_metric, publish_gauge_metric 

33from buildgrid.server.monitoring import ( 

34 MonitoringBus, 

35 MonitoringOutputFormat, 

36 MonitoringOutputType, 

37 get_monitoring_bus, 

38 set_monitoring_bus, 

39) 

40from buildgrid.server.threading import ContextThreadPoolExecutor, ContextWorker 

41 

42LOGGER = logging.getLogger(__name__) 

43 

44 

45def _digests_str(digests: List[Digest]) -> str: 

46 return f"{len(digests)} digests ({sum(d.size_bytes for d in digests)} bytes)" 

47 

48 

49class CASCleanUp: 

50 """Creates a CAS cleanup service.""" 

51 

52 def __init__( 

53 self, 

54 dry_run: bool, 

55 high_watermark: int, 

56 low_watermark: int, 

57 sleep_interval: int, 

58 batch_size: int, 

59 only_if_unused_for: timedelta, 

60 indexes: Dict[str, IndexABC], 

61 monitor: bool, 

62 mon_endpoint_type: Optional[MonitoringOutputType] = None, 

63 mon_endpoint_location: Optional[str] = None, 

64 mon_serialisation_format: Optional[MonitoringOutputFormat] = None, 

65 mon_metric_prefix: Optional[str] = None, 

66 ) -> None: 

67 self._stack = ExitStack() 

68 self._dry_run = dry_run 

69 

70 self._high_watermark = high_watermark 

71 self._low_watermark = low_watermark 

72 self._batch_size = batch_size 

73 self._only_if_unused_for = only_if_unused_for 

74 

75 self._indexes = indexes 

76 

77 self._is_instrumented = monitor 

78 

79 self._sleep_interval = sleep_interval 

80 

81 if self._is_instrumented: 

82 set_monitoring_bus( 

83 MonitoringBus( 

84 endpoint_type=mon_endpoint_type or MonitoringOutputType.SOCKET, 

85 endpoint_location=mon_endpoint_location, 

86 metric_prefix=mon_metric_prefix or "", 

87 serialisation_format=mon_serialisation_format or MonitoringOutputFormat.STATSD, 

88 ) 

89 ) 

90 

91 # --- Public API --- 

92 

93 def start(self, timeout: Optional[float] = None) -> None: 

94 """Start cleanup service""" 

95 if self._is_instrumented: 

96 self._stack.enter_context(get_monitoring_bus()) 

97 worker = self._stack.enter_context(ContextWorker(self._begin_cleanup, "CleanUpLauncher")) 

98 worker.wait(timeout=timeout) 

99 

100 def stop(self, *args: Any, **kwargs: Any) -> None: 

101 """Stops the cleanup service""" 

102 LOGGER.info("Stopping Cleanup Service") 

103 self._stack.close() 

104 

105 def _begin_cleanup(self, stop_requested: threading.Event) -> None: 

106 if self._dry_run: 

107 for instance_name in self._indexes.keys(): 

108 self._calculate_cleanup(instance_name) 

109 return 

110 

111 attempts = 0 

112 with ContextThreadPoolExecutor(max_workers=len(self._indexes)) as ex: 

113 while True: 

114 futures = { 

115 instance_name: ex.submit(self._cleanup_worker, instance_name, stop_requested) 

116 for instance_name in self._indexes.keys() 

117 } 

118 

119 failed = False 

120 for instance_name, future in futures.items(): 

121 try: 

122 future.result() 

123 LOGGER.info(f"CleanUp for {instance_name} completed") 

124 except Exception: 

125 LOGGER.exception(f"CleanUp for {instance_name} failed") 

126 failed = True 

127 

128 if not failed: 

129 break 

130 

131 # Exponential backoff before retrying 

132 sleep_time = 1.6**attempts 

133 LOGGER.info(f"Retrying Cleanup in {sleep_time} seconds") 

134 stop_requested.wait(timeout=sleep_time) 

135 attempts += 1 

136 continue 

137 

138 def _calculate_cleanup(self, instance_name: str) -> None: 

139 """Work out which blobs will be deleted by the cleanup command.""" 

140 

141 LOGGER.info(f"Cleanup dry run for instance '{instance_name}'") 

142 index = self._indexes[instance_name] 

143 only_delete_before = self._get_last_accessed_threshold() 

144 total_size = index.get_total_size() 

145 LOGGER.info( 

146 f"CAS size is {total_size} bytes, compared with a high water mark of " 

147 f"{self._high_watermark} bytes and a low water mark of {self._low_watermark} bytes." 

148 ) 

149 if total_size >= self._high_watermark: 

150 required_space = total_size - self._low_watermark 

151 cleared_space = index.delete_n_bytes(required_space, dry_run=True, protect_blobs_after=only_delete_before) 

152 LOGGER.info(f"{cleared_space} of the requested {required_space} bytes would be deleted.") 

153 else: 

154 LOGGER.info(f"Total size {total_size} is less than the high water mark, " f"nothing will be deleted.") 

155 

156 def _do_cleanup_batch( 

157 self, 

158 instance_name: str, 

159 index: IndexABC, 

160 only_delete_before: datetime, 

161 total_size: int, 

162 stop_requested: threading.Event, 

163 ) -> None: 

164 batch_start_time = time.time() 

165 

166 LOGGER.info(f"Deleting {self._batch_size} bytes from the index") 

167 bytes_deleted = index.delete_n_bytes(self._batch_size, protect_blobs_after=only_delete_before) 

168 

169 if not bytes_deleted: 

170 err = ( 

171 "Marked 0 digests for deletion, even though cleanup was triggered. " 

172 "This may be because the remaining digests have been accessed within " 

173 f"{only_delete_before}." 

174 ) 

175 if total_size >= self._high_watermark: 

176 LOGGER.error(f"{err} Total size still remains greater than high watermark!") 

177 else: 

178 LOGGER.warning(err) 

179 stop_requested.wait(timeout=self._sleep_interval) # Avoid a busy loop when we can't make progress 

180 return 

181 

182 LOGGER.info(f"Bulk deleted {bytes_deleted} bytes from index") 

183 

184 if self._is_instrumented: 

185 batch_duration = time.time() - batch_start_time 

186 bytes_deleted_per_second = bytes_deleted / batch_duration 

187 publish_gauge_metric( 

188 CLEANUP_BYTES_DELETION_RATE_METRIC_NAME, bytes_deleted_per_second, {"instance-name": instance_name} 

189 ) 

190 publish_counter_metric(CLEANUP_BYTES_DELETED_METRIC_NAME, bytes_deleted, {"instance-name": instance_name}) 

191 

192 def _cleanup_worker(self, instance_name: str, stop_requested: threading.Event) -> None: 

193 """Cleanup when full""" 

194 index = self._indexes[instance_name] 

195 index.set_instance_name(instance_name) 

196 LOGGER.info(f"Cleanup for instance '{instance_name}' started.") 

197 

198 while not stop_requested.is_set(): 

199 # When first starting a loop, we will also include any remaining delete markers as part of 

200 # the total size. 

201 total_size = index.get_total_size(include_marked=True) 

202 if self._is_instrumented: 

203 publish_gauge_metric( 

204 CLEANUP_INDEX_TOTAL_SIZE_BYTES_INCLUDE_MARKED_METRIC_NAME, 

205 total_size, 

206 {"instance-name": instance_name}, 

207 ) 

208 

209 if total_size >= self._high_watermark: 

210 to_delete = total_size - self._low_watermark 

211 LOGGER.info( 

212 f"CAS size for instance '{instance_name}' is {total_size} bytes, at least " 

213 f"{to_delete} bytes will be cleared." 

214 ) 

215 LOGGER.info(f"Deleting items from storage/index for instance '{instance_name}'.") 

216 

217 with DurationMetric(CLEANUP_RUNTIME_METRIC_NAME, instance_name, instanced=True): 

218 while not stop_requested.is_set() and total_size > self._low_watermark: 

219 only_delete_before = self._get_last_accessed_threshold() 

220 self._do_cleanup_batch( 

221 instance_name=instance_name, 

222 index=index, 

223 only_delete_before=only_delete_before, 

224 total_size=total_size, 

225 stop_requested=stop_requested, 

226 ) 

227 # Here we should have already deleted any remaining delete markers, 

228 # Our cleanup target should be based on finding new entries. 

229 total_size = index.get_total_size(include_marked=False) 

230 if self._is_instrumented: 

231 publish_gauge_metric( 

232 CLEANUP_INDEX_TOTAL_SIZE_BYTES_EXCLUDE_MARKED_METRIC_NAME, 

233 total_size, 

234 {"instance-name": instance_name}, 

235 ) 

236 

237 LOGGER.info(f"After batch, the non-stale total size is {total_size} bytes.") 

238 

239 # Report the final size including the marked deleted items. 

240 total_size = index.get_total_size(include_marked=True) 

241 if self._is_instrumented: 

242 publish_gauge_metric( 

243 CLEANUP_INDEX_TOTAL_SIZE_BYTES_INCLUDE_MARKED_METRIC_NAME, 

244 total_size, 

245 {"instance-name": instance_name}, 

246 ) 

247 LOGGER.info(f"Finished cleanup. CAS size is now {total_size} bytes.") 

248 

249 stop_requested.wait(timeout=self._sleep_interval) 

250 

251 def _get_last_accessed_threshold(self) -> datetime: 

252 return datetime.utcnow() - self._only_if_unused_for