Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cleanup/cleanup.py: 71.93%

114 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import threading 

17import time 

18from contextlib import ExitStack 

19from datetime import datetime, timedelta 

20from typing import Any, Dict, List, Optional 

21 

22from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

23from buildgrid.server.cas.storage.index.index_abc import IndexABC 

24from buildgrid.server.context import instance_context 

25from buildgrid.server.logging import buildgrid_logger 

26from buildgrid.server.metrics_names import METRIC 

27from buildgrid.server.metrics_utils import publish_counter_metric, publish_gauge_metric, timer 

28from buildgrid.server.monitoring import get_monitoring_bus 

29from buildgrid.server.threading import ContextThreadPoolExecutor, ContextWorker 

30 

31LOGGER = buildgrid_logger(__name__) 

32 

33 

34def _digests_str(digests: List[Digest]) -> str: 

35 return f"{len(digests)} digests ({sum(d.size_bytes for d in digests)} bytes)" 

36 

37 

38class CASCleanUp: 

39 """Creates a CAS cleanup service.""" 

40 

41 def __init__( 

42 self, 

43 dry_run: bool, 

44 high_watermark: int, 

45 low_watermark: int, 

46 sleep_interval: int, 

47 batch_size: int, 

48 only_if_unused_for: timedelta, 

49 indexes: Dict[str, IndexABC], 

50 monitor: bool, 

51 ) -> None: 

52 self._stack = ExitStack() 

53 self._dry_run = dry_run 

54 

55 self._high_watermark = high_watermark 

56 self._low_watermark = low_watermark 

57 self._batch_size = batch_size 

58 self._only_if_unused_for = only_if_unused_for 

59 

60 self._indexes = indexes 

61 

62 self._is_instrumented = monitor 

63 

64 self._sleep_interval = sleep_interval 

65 

66 # --- Public API --- 

67 

68 def start(self, timeout: Optional[float] = None) -> None: 

69 """Start cleanup service""" 

70 if self._is_instrumented: 

71 self._stack.enter_context(get_monitoring_bus()) 

72 worker = self._stack.enter_context(ContextWorker(self._begin_cleanup, "CleanUpLauncher")) 

73 worker.wait(timeout=timeout) 

74 

75 def stop(self, *args: Any, **kwargs: Any) -> None: 

76 """Stops the cleanup service""" 

77 LOGGER.info("Stopping Cleanup Service.") 

78 self._stack.close() 

79 

80 def _begin_cleanup(self, stop_requested: threading.Event) -> None: 

81 if self._dry_run: 

82 for instance_name in self._indexes.keys(): 

83 self._calculate_cleanup(instance_name) 

84 return 

85 

86 attempts = 0 

87 with ContextThreadPoolExecutor(max_workers=len(self._indexes)) as ex: 

88 while True: 

89 futures = { 

90 instance_name: ex.submit(self._cleanup_worker, instance_name, stop_requested) 

91 for instance_name in self._indexes.keys() 

92 } 

93 

94 failed = False 

95 for instance_name, future in futures.items(): 

96 try: 

97 future.result() 

98 LOGGER.info("Cleanup completed.", tags=dict(instance_name=instance_name)) 

99 except Exception: 

100 LOGGER.exception("Cleanup failed.", tags=dict(instance_name=instance_name)) 

101 failed = True 

102 

103 if not failed: 

104 break 

105 

106 # Exponential backoff before retrying 

107 sleep_time = 1.6**attempts 

108 LOGGER.info("Retrying Cleanup after delay...", tags=dict(sleep_time_seconds=sleep_time)) 

109 stop_requested.wait(timeout=sleep_time) 

110 attempts += 1 

111 continue 

112 

113 def _calculate_cleanup(self, instance_name: str) -> None: 

114 """Work out which blobs will be deleted by the cleanup command.""" 

115 with instance_context(instance_name): 

116 LOGGER.info("Cleanup dry run.", tags=dict(instance_name=instance_name)) 

117 index = self._indexes[instance_name] 

118 only_delete_before = self._get_last_accessed_threshold() 

119 total_size = index.get_total_size() 

120 LOGGER.info( 

121 "Calculated CAS size.", 

122 tags=dict( 

123 total_size=total_size, 

124 high_watermark_bytes=self._high_watermark, 

125 low_watermark_bytes=self._low_watermark, 

126 ), 

127 ) 

128 if total_size >= self._high_watermark: 

129 required_space = total_size - self._low_watermark 

130 cleared_space = index.delete_n_bytes( 

131 required_space, dry_run=True, protect_blobs_after=only_delete_before 

132 ) 

133 LOGGER.info(f"Determined {cleared_space} of the requested {required_space} bytes would be deleted.") 

134 else: 

135 LOGGER.info(f"Total size {total_size} is less than the high water mark, " f"nothing will be deleted.") 

136 

137 def _do_cleanup_batch( 

138 self, 

139 instance_name: str, 

140 index: IndexABC, 

141 only_delete_before: datetime, 

142 total_size: int, 

143 stop_requested: threading.Event, 

144 ) -> None: 

145 batch_start_time = time.time() 

146 

147 LOGGER.info("Deleting bytes from the index.", tags=dict(batch_size=self._batch_size)) 

148 bytes_deleted = index.delete_n_bytes(self._batch_size, protect_blobs_after=only_delete_before) 

149 

150 if not bytes_deleted: 

151 err = ( 

152 "Marked 0 digests for deletion, even though cleanup was triggered. " 

153 "This may be because the remaining digests have been accessed within " 

154 f"{only_delete_before}." 

155 ) 

156 if total_size >= self._high_watermark: 

157 LOGGER.error(f"{err} Total size still remains greater than high watermark!") 

158 else: 

159 LOGGER.warning(err) 

160 stop_requested.wait(timeout=self._sleep_interval) # Avoid a busy loop when we can't make progress 

161 return 

162 

163 LOGGER.info("Bulk deleted bytes from index.", tags=dict(bytes_deleted=bytes_deleted)) 

164 

165 if self._is_instrumented: 

166 batch_duration = time.time() - batch_start_time 

167 bytes_deleted_per_second = bytes_deleted / batch_duration 

168 publish_gauge_metric(METRIC.CLEANUP.BYTES_DELETED_PER_SECOND, bytes_deleted_per_second) 

169 publish_counter_metric(METRIC.CLEANUP.BYTES_DELETED_COUNT, bytes_deleted) 

170 

171 def _cleanup_worker(self, instance_name: str, stop_requested: threading.Event) -> None: 

172 """Cleanup when full""" 

173 with instance_context(instance_name): 

174 index = self._indexes[instance_name] 

175 LOGGER.info("Cleanup started.", tags=dict(instance_name=instance_name)) 

176 

177 while not stop_requested.is_set(): 

178 # When first starting a loop, we will also include any remaining delete markers as part of 

179 # the total size. 

180 total_size = index.get_total_size() 

181 self.publish_total_size_metric(total_size) 

182 

183 if total_size >= self._high_watermark: 

184 to_delete = total_size - self._low_watermark 

185 LOGGER.info( 

186 "High watermark exceeded. Deleting items from storage/index.", 

187 tags=dict(total_bytes=total_size, min_bytes_to_delete=to_delete, instance_name=instance_name), 

188 ) 

189 

190 with timer(METRIC.CLEANUP.DURATION): 

191 while not stop_requested.is_set() and total_size > self._low_watermark: 

192 only_delete_before = self._get_last_accessed_threshold() 

193 with timer(METRIC.CLEANUP.BATCH_DURATION): 

194 self._do_cleanup_batch( 

195 instance_name=instance_name, 

196 index=index, 

197 only_delete_before=only_delete_before, 

198 total_size=total_size, 

199 stop_requested=stop_requested, 

200 ) 

201 total_size = index.get_total_size() 

202 self.publish_total_size_metric(total_size) 

203 LOGGER.info("Finished cleanup batch.", tags=dict(non_stale_total_bytes=total_size)) 

204 

205 LOGGER.info("Finished cleanup.", tags=dict(total_bytes=total_size)) 

206 

207 stop_requested.wait(timeout=self._sleep_interval) 

208 

209 def _get_last_accessed_threshold(self) -> datetime: 

210 return datetime.utcnow() - self._only_if_unused_for 

211 

212 def publish_total_size_metric(self, total_size: int) -> None: 

213 if self._is_instrumented: 

214 publish_gauge_metric(METRIC.CLEANUP.TOTAL_BYTES_COUNT, total_size) 

215 publish_gauge_metric(METRIC.CLEANUP.LOW_WATERMARK_BYTES_COUNT, self._low_watermark) 

216 publish_gauge_metric(METRIC.CLEANUP.HIGH_WATERMARK_BYTES_COUNT, self._high_watermark)