Coverage for /builds/BuildGrid/buildgrid/buildgrid/cleanup/cleanup.py: 73.83%

107 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-04-15 14:01 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import logging 

17import threading 

18import time 

19from contextlib import ExitStack 

20from datetime import datetime, timedelta 

21from typing import Any, Dict, List, Optional 

22 

23from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

24from buildgrid.server.cas.storage.index.index_abc import IndexABC 

25from buildgrid.server.metrics_names import CLEANUP_BYTES_DELETION_RATE_METRIC_NAME, CLEANUP_RUNTIME_METRIC_NAME 

26from buildgrid.server.metrics_utils import DurationMetric, publish_gauge_metric 

27from buildgrid.server.monitoring import ( 

28 MonitoringBus, 

29 MonitoringOutputFormat, 

30 MonitoringOutputType, 

31 get_monitoring_bus, 

32 set_monitoring_bus, 

33) 

34from buildgrid.server.threading import ContextThreadPoolExecutor, ContextWorker 

35 

36LOGGER = logging.getLogger(__name__) 

37 

38 

39def _digests_str(digests: List[Digest]) -> str: 

40 return f"{len(digests)} digests ({sum(d.size_bytes for d in digests)} bytes)" 

41 

42 

43class CASCleanUp: 

44 """Creates a CAS cleanup service.""" 

45 

46 def __init__( 

47 self, 

48 dry_run: bool, 

49 high_watermark: int, 

50 low_watermark: int, 

51 sleep_interval: int, 

52 batch_size: int, 

53 only_if_unused_for: timedelta, 

54 indexes: Dict[str, IndexABC], 

55 monitor: bool, 

56 mon_endpoint_type: Optional[MonitoringOutputType] = None, 

57 mon_endpoint_location: Optional[str] = None, 

58 mon_serialisation_format: Optional[MonitoringOutputFormat] = None, 

59 mon_metric_prefix: Optional[str] = None, 

60 ) -> None: 

61 self._stack = ExitStack() 

62 self._dry_run = dry_run 

63 

64 self._high_watermark = high_watermark 

65 self._low_watermark = low_watermark 

66 self._batch_size = batch_size 

67 self._only_if_unused_for = only_if_unused_for 

68 

69 self._indexes = indexes 

70 

71 self._is_instrumented = monitor 

72 

73 self._sleep_interval = sleep_interval 

74 

75 if self._is_instrumented: 

76 set_monitoring_bus( 

77 MonitoringBus( 

78 endpoint_type=mon_endpoint_type or MonitoringOutputType.SOCKET, 

79 endpoint_location=mon_endpoint_location, 

80 metric_prefix=mon_metric_prefix or "", 

81 serialisation_format=mon_serialisation_format or MonitoringOutputFormat.STATSD, 

82 ) 

83 ) 

84 

85 # --- Public API --- 

86 

87 def start(self, timeout: Optional[float] = None) -> None: 

88 """Start cleanup service""" 

89 if self._is_instrumented: 

90 self._stack.enter_context(get_monitoring_bus()) 

91 worker = self._stack.enter_context(ContextWorker(self._begin_cleanup, "CleanUpLauncher")) 

92 worker.wait(timeout=timeout) 

93 

94 def stop(self, *args: Any, **kwargs: Any) -> None: 

95 """Stops the cleanup service""" 

96 LOGGER.info("Stopping Cleanup Service") 

97 self._stack.close() 

98 

99 def _begin_cleanup(self, stop_requested: threading.Event) -> None: 

100 if self._dry_run: 

101 for instance_name in self._indexes.keys(): 

102 self._calculate_cleanup(instance_name) 

103 return 

104 

105 attempts = 0 

106 with ContextThreadPoolExecutor(max_workers=len(self._indexes)) as ex: 

107 while True: 

108 futures = { 

109 instance_name: ex.submit(self._cleanup_worker, instance_name, stop_requested) 

110 for instance_name in self._indexes.keys() 

111 } 

112 

113 failed = False 

114 for instance_name, future in futures.items(): 

115 try: 

116 future.result() 

117 LOGGER.info(f"CleanUp for {instance_name} completed") 

118 except Exception: 

119 LOGGER.exception(f"CleanUp for {instance_name} failed") 

120 failed = True 

121 

122 if not failed: 

123 break 

124 

125 # Exponential backoff before retrying 

126 sleep_time = 1.6**attempts 

127 LOGGER.info(f"Retrying Cleanup in {sleep_time} seconds") 

128 stop_requested.wait(timeout=sleep_time) 

129 attempts += 1 

130 continue 

131 

132 def _calculate_cleanup(self, instance_name: str) -> None: 

133 """Work out which blobs will be deleted by the cleanup command.""" 

134 

135 LOGGER.info(f"Cleanup dry run for instance '{instance_name}'") 

136 index = self._indexes[instance_name] 

137 only_delete_before = self._get_last_accessed_threshold() 

138 total_size = index.get_total_size() 

139 LOGGER.info( 

140 f"CAS size is {total_size} bytes, compared with a high water mark of " 

141 f"{self._high_watermark} bytes and a low water mark of {self._low_watermark} bytes." 

142 ) 

143 if total_size >= self._high_watermark: 

144 required_space = total_size - self._low_watermark 

145 cleared_space = index.delete_n_bytes(required_space, dry_run=True, protect_blobs_after=only_delete_before) 

146 LOGGER.info(f"{cleared_space} of the requested {required_space} bytes would be deleted.") 

147 else: 

148 LOGGER.info(f"Total size {total_size} is less than the high water mark, " f"nothing will be deleted.") 

149 

150 def _do_cleanup_batch( 

151 self, 

152 instance_name: str, 

153 index: IndexABC, 

154 only_delete_before: datetime, 

155 total_size: int, 

156 stop_requested: threading.Event, 

157 ) -> None: 

158 batch_start_time = time.time() 

159 

160 LOGGER.info(f"Deleting {self._batch_size} bytes from the index") 

161 bytes_deleted = index.delete_n_bytes(self._batch_size, protect_blobs_after=only_delete_before) 

162 

163 if not bytes_deleted: 

164 err = ( 

165 "Marked 0 digests for deletion, even though cleanup was triggered. " 

166 "This may be because the remaining digests have been accessed within " 

167 f"{only_delete_before}." 

168 ) 

169 if total_size >= self._high_watermark: 

170 LOGGER.error(f"{err} Total size still remains greater than high watermark!") 

171 else: 

172 LOGGER.warning(err) 

173 stop_requested.wait(timeout=self._sleep_interval) # Avoid a busy loop when we can't make progress 

174 return 

175 

176 LOGGER.info(f"Bulk deleted {bytes_deleted} bytes from index") 

177 

178 if self._is_instrumented: 

179 batch_duration = time.time() - batch_start_time 

180 bytes_deleted_per_second = bytes_deleted / batch_duration 

181 publish_gauge_metric( 

182 CLEANUP_BYTES_DELETION_RATE_METRIC_NAME, bytes_deleted_per_second, {"instance-name": instance_name} 

183 ) 

184 

185 def _cleanup_worker(self, instance_name: str, stop_requested: threading.Event) -> None: 

186 """Cleanup when full""" 

187 index = self._indexes[instance_name] 

188 index.set_instance_name(instance_name) 

189 LOGGER.info(f"Cleanup for instance '{instance_name}' started.") 

190 

191 while not stop_requested.is_set(): 

192 # When first starting a loop, we will also include any remaining delete markers as part of 

193 # the total size. 

194 total_size = index.get_total_size(include_marked=True) 

195 if total_size >= self._high_watermark: 

196 to_delete = total_size - self._low_watermark 

197 LOGGER.info( 

198 f"CAS size for instance '{instance_name}' is {total_size} bytes, at least " 

199 f"{to_delete} bytes will be cleared." 

200 ) 

201 LOGGER.info(f"Deleting items from storage/index for instance '{instance_name}'.") 

202 

203 with DurationMetric(CLEANUP_RUNTIME_METRIC_NAME, instance_name, instanced=True): 

204 while not stop_requested.is_set() and total_size > self._low_watermark: 

205 only_delete_before = self._get_last_accessed_threshold() 

206 self._do_cleanup_batch( 

207 instance_name=instance_name, 

208 index=index, 

209 only_delete_before=only_delete_before, 

210 total_size=total_size, 

211 stop_requested=stop_requested, 

212 ) 

213 # Here we should have already deleted any remaining delete markers, 

214 # Our cleanup target should be based on finding new entries. 

215 total_size = index.get_total_size(include_marked=False) 

216 LOGGER.info(f"After batch, the non-stale total size is {total_size} bytes.") 

217 

218 # Report the final size including the marked deleted items. 

219 total_size = index.get_total_size(include_marked=True) 

220 LOGGER.info(f"Finished cleanup. CAS size is now {total_size} bytes.") 

221 

222 stop_requested.wait(timeout=self._sleep_interval) 

223 

224 def _get_last_accessed_threshold(self) -> datetime: 

225 return datetime.utcnow() - self._only_if_unused_for