Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/mem/impl.py: 79.09%

220 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import bisect 

17import logging 

18from multiprocessing import Queue 

19from threading import Condition, Lock, Thread 

20from typing import List, Tuple 

21from datetime import datetime 

22import time 

23 

24from buildgrid._protos.google.longrunning import operations_pb2 

25from buildgrid._enums import LeaseState, MetricCategories, OperationStage 

26from buildgrid._exceptions import InvalidArgumentError 

27from buildgrid.utils import JobState, BrowserURL 

28from buildgrid.settings import MAX_JOB_BLOCK_TIME 

29from buildgrid.server.metrics_names import ( 

30 BOTS_ASSIGN_JOB_LEASES_TIME_METRIC_NAME, 

31 DATA_STORE_CHECK_FOR_UPDATE_TIME_METRIC_NAME, 

32 DATA_STORE_CREATE_JOB_TIME_METRIC_NAME, 

33 DATA_STORE_CREATE_LEASE_TIME_METRIC_NAME, 

34 DATA_STORE_CREATE_OPERATION_TIME_METRIC_NAME, 

35 DATA_STORE_GET_JOB_BY_DIGEST_TIME_METRIC_NAME, 

36 DATA_STORE_GET_JOB_BY_NAME_TIME_METRIC_NAME, 

37 DATA_STORE_GET_JOB_BY_OPERATION_TIME_METRIC_NAME, 

38 DATA_STORE_LIST_OPERATIONS_TIME_METRIC_NAME, 

39 DATA_STORE_QUEUE_JOB_TIME_METRIC_NAME, 

40 DATA_STORE_UPDATE_JOB_TIME_METRIC_NAME, 

41 DATA_STORE_UPDATE_LEASE_TIME_METRIC_NAME, 

42 DATA_STORE_UPDATE_OPERATION_TIME_METRIC_NAME 

43) 

44from buildgrid.server.metrics_utils import DurationMetric 

45from buildgrid.server.operations.filtering import OperationFilter, DEFAULT_OPERATION_FILTERS 

46from buildgrid.server.persistence.interface import DataStoreInterface 

47 

48 

49class MemoryDataStore(DataStoreInterface): 

50 

51 def __init__(self, storage): 

52 super().__init__(storage) 

53 self.logger = logging.getLogger(__file__) 

54 self.logger.info("Creating in-memory scheduler") 

55 

56 self.queue = [] 

57 self.queue_lock = Lock() 

58 self.queue_condition = Condition(lock=self.queue_lock) 

59 

60 self.jobs_by_action = {} 

61 self.jobs_by_operation = {} 

62 self.jobs_by_name = {} 

63 

64 self.operations_by_stage = {} 

65 self.leases_by_state = {} 

66 self.is_instrumented = False 

67 

68 self.update_event_queue = Queue() 

69 self.watcher = Thread(name="JobWatcher", target=self.wait_for_job_updates, daemon=True) 

70 self.watcher_keep_running = True 

71 self.watcher.start() 

72 

73 def __repr__(self): 

74 return "In-memory data store interface" 

75 

76 def activate_monitoring(self): 

77 if self.is_instrumented: 

78 return 

79 

80 self.operations_by_stage = { 

81 stage: set() for stage in OperationStage 

82 } 

83 self.leases_by_state = { 

84 state: set() for state in LeaseState 

85 } 

86 self.is_instrumented = True 

87 

88 def deactivate_monitoring(self): 

89 if not self.is_instrumented: 

90 return 

91 

92 self.operations_by_stage = {} 

93 self.leases_by_state = {} 

94 self.is_instrumented = False 

95 

96 def _check_job_timeout(self, job_internal, *, max_execution_timeout=None): 

97 """ Do a lazy check of maximum allowed job timeouts when clients try to retrieve 

98 an existing job. 

99 Cancel the job and related operations/leases, if we detect they have 

100 exceeded timeouts on access. 

101 

102 Returns the `buildgrid.server.Job` object, possibly updated with `cancelled=True`. 

103 """ 

104 if job_internal and max_execution_timeout and job_internal.worker_start_timestamp_as_datetime: 

105 if job_internal.operation_stage == OperationStage.EXECUTING: 

106 executing_duration = datetime.utcnow() - job_internal.worker_start_timestamp_as_datetime 

107 if executing_duration.total_seconds() >= max_execution_timeout: 

108 self.logger.warning(f"Job=[{job_internal}] has been executing for " 

109 f"executing_duration=[{executing_duration}]. " 

110 f"max_execution_timeout=[{max_execution_timeout}] " 

111 "Cancelling.") 

112 job_internal.cancel_all_operations(data_store=self) 

113 self.logger.info(f"Job=[{job_internal}] has been cancelled.") 

114 return job_internal 

115 

116 @DurationMetric(DATA_STORE_GET_JOB_BY_NAME_TIME_METRIC_NAME, instanced=True) 

117 def get_job_by_name(self, name, *, max_execution_timeout=None): 

118 job = self.jobs_by_name.get(name) 

119 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

120 

121 @DurationMetric(DATA_STORE_GET_JOB_BY_DIGEST_TIME_METRIC_NAME, instanced=True) 

122 def get_job_by_action(self, action_digest, *, max_execution_timeout=None): 

123 job = self.jobs_by_action.get(action_digest.hash) 

124 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

125 

126 @DurationMetric(DATA_STORE_GET_JOB_BY_OPERATION_TIME_METRIC_NAME, instanced=True) 

127 def get_job_by_operation(self, operation_name, *, max_execution_timeout=None): 

128 job = self.jobs_by_operation.get(operation_name) 

129 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

130 

131 def get_all_jobs(self): 

132 return [job for job in self.jobs_by_name.values() 

133 if job.operation_stage != OperationStage.COMPLETED] 

134 

135 def get_jobs_by_stage(self, operation_stage): 

136 return [job for job in self.jobs_by_name.values() 

137 if job.operation_stage == operation_stage] 

138 

139 def _get_job_count_by_stage(self): 

140 results = [] 

141 for stage in OperationStage: 

142 results.append((stage, len(self.get_jobs_by_stage(stage)))) 

143 return results 

144 

145 @DurationMetric(DATA_STORE_CREATE_JOB_TIME_METRIC_NAME, instanced=True) 

146 def create_job(self, job): 

147 self.jobs_by_action[job.action_digest.hash] = job 

148 self.jobs_by_name[job.name] = job 

149 if self._action_browser_url is not None: 

150 job.set_action_url(BrowserURL(self._action_browser_url, self._instance_name)) 

151 

152 @DurationMetric(DATA_STORE_QUEUE_JOB_TIME_METRIC_NAME, instanced=True) 

153 def queue_job(self, job_name): 

154 job = self.jobs_by_name[job_name] 

155 with self.queue_condition: 

156 if job.operation_stage != OperationStage.QUEUED: 

157 bisect.insort(self.queue, job) 

158 self.logger.info(f"Job queued: [{job.name}]") 

159 # Wake all waiters as not all waiters may have the required capabilities 

160 self.queue_condition.notify_all() 

161 else: 

162 self.logger.info(f"Job already queued: [{job.name}]") 

163 self.queue.sort() 

164 

165 @DurationMetric(DATA_STORE_UPDATE_JOB_TIME_METRIC_NAME, instanced=True) 

166 def update_job(self, job_name, changes, skip_notify=False): 

167 # With this implementation, there's no need to actually make 

168 # changes to the stored job, since its a reference to the 

169 # in-memory job that caused this method to be called. 

170 self.update_event_queue.put((job_name, changes, skip_notify)) 

171 

172 def delete_job(self, job_name): 

173 job = self.jobs_by_name[job_name] 

174 

175 del self.jobs_by_action[job.action_digest.hash] 

176 del self.jobs_by_name[job.name] 

177 

178 self.logger.info(f"Job deleted: [{job.name}]") 

179 

180 if self.is_instrumented: 

181 for stage in OperationStage: 

182 self.operations_by_stage[stage].discard(job.name) 

183 

184 for state in LeaseState: 

185 self.leases_by_state[state].discard(job.name) 

186 

187 def wait_for_job_updates(self): 

188 self.logger.info("Starting job watcher thread") 

189 while self.watcher_keep_running: 

190 try: 

191 job_name, changes, skip_notify = self.update_event_queue.get() 

192 except EOFError: 

193 continue 

194 with DurationMetric(DATA_STORE_CHECK_FOR_UPDATE_TIME_METRIC_NAME): 

195 with self.watched_jobs_lock: 

196 if (all(field not in changes for field in ("cancelled", "stage")) or 

197 job_name not in self.watched_jobs): 

198 # If the stage or cancellation state haven't changed, we don't 

199 # need to do anything with this event. Similarly, if we aren't 

200 # watching this job, we can ignore the event. 

201 continue 

202 job = self.get_job_by_name(job_name) 

203 spec = self.watched_jobs[job_name] 

204 new_state = JobState(job) 

205 if spec.last_state != new_state and not skip_notify: 

206 spec.last_state = new_state 

207 if not skip_notify: 

208 spec.event.notify_change() 

209 

210 def store_response(self, job, commit_changes): 

211 # The job is always in memory in this implementation, so there's 

212 # no need to write anything to the CAS, since the job stays in 

213 # memory as long as we need it 

214 pass 

215 

216 def get_operations_by_stage(self, operation_stage): 

217 return self.operations_by_stage.get(operation_stage, set()) 

218 

219 def _get_operation_count_by_stage(self): 

220 results = [] 

221 for stage in OperationStage: 

222 results.append((stage, len(self.get_operations_by_stage(stage)))) 

223 return results 

224 

225 @DurationMetric(DATA_STORE_LIST_OPERATIONS_TIME_METRIC_NAME, instanced=True) 

226 def list_operations(self, 

227 operation_filters: List[OperationFilter]=None, 

228 page_size: int=None, 

229 page_token: str=None, 

230 max_execution_timeout: int=None) -> Tuple[List[operations_pb2.Operation], str]: 

231 

232 if operation_filters and operation_filters != DEFAULT_OPERATION_FILTERS: 

233 raise InvalidArgumentError("Filtering is not supported with the in-memory scheduler.") 

234 

235 if page_token: 

236 raise InvalidArgumentError("page_token is not supported in the in-memory scheduler.") 

237 

238 # Run through all the jobs and see if any of are 

239 # exceeding the execution timeout; mark those as cancelled 

240 for job in self.jobs_by_name.values(): 

241 self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

242 

243 # Return all operations 

244 return [ 

245 operation for job in self.jobs_by_name.values() for operation in job.get_all_operations() 

246 ], "" 

247 

248 @DurationMetric(DATA_STORE_CREATE_OPERATION_TIME_METRIC_NAME, instanced=True) 

249 def create_operation(self, operation_name, job_name, request_metadata=None): 

250 job = self.jobs_by_name[job_name] 

251 self.jobs_by_operation[operation_name] = job 

252 if self.is_instrumented: 

253 self.operations_by_stage[job.operation_stage].add(job_name) 

254 

255 @DurationMetric(DATA_STORE_UPDATE_OPERATION_TIME_METRIC_NAME, instanced=True) 

256 def update_operation(self, operation_name, changes): 

257 if self.is_instrumented: 

258 job = self.jobs_by_operation[operation_name] 

259 self.operations_by_stage[job.operation_stage].add(job.name) 

260 other_stages = [member for member in OperationStage if member != job.operation_stage] 

261 for stage in other_stages: 

262 self.operations_by_stage[stage].discard(job.name) 

263 

264 def delete_operation(self, operation_name): 

265 del self.jobs_by_operation[operation_name] 

266 

267 def get_leases_by_state(self, lease_state): 

268 return self.leases_by_state.get(lease_state, set()) 

269 

270 def _get_lease_count_by_state(self): 

271 results = [] 

272 for state in LeaseState: 

273 results.append((state, len(self.get_leases_by_state(state)))) 

274 return results 

275 

276 @DurationMetric(DATA_STORE_CREATE_LEASE_TIME_METRIC_NAME, instanced=True) 

277 def create_lease(self, lease): 

278 if self.is_instrumented: 

279 self.leases_by_state[LeaseState(lease.state)].add(lease.id) 

280 

281 @DurationMetric(DATA_STORE_UPDATE_LEASE_TIME_METRIC_NAME, instanced=True) 

282 def update_lease(self, job_name, changes): 

283 if self.is_instrumented: 

284 job = self.jobs_by_name[job_name] 

285 state = LeaseState(job.lease.state) 

286 self.leases_by_state[state].add(job.lease.id) 

287 other_states = [member for member in LeaseState if member != state] 

288 for state in other_states: 

289 self.leases_by_state[state].discard(job.lease.id) 

290 

291 def load_unfinished_jobs(self): 

292 return [] 

293 

294 def get_operation_request_metadata_by_name(self, operation_name): 

295 return None 

296 

297 @DurationMetric(BOTS_ASSIGN_JOB_LEASES_TIME_METRIC_NAME, instanced=True) 

298 def assign_lease_for_next_job(self, capabilities, callback, timeout=None): 

299 """Return the highest priority job that can be run by a worker. 

300 

301 Iterate over the job queue and find the highest priority job which 

302 the worker can run given the provided capabilities. Takes a 

303 dictionary of worker capabilities to compare with job requirements. 

304 

305 :param capabilities: Dictionary of worker capabilities to compare 

306 with job requirements when finding a job. 

307 :type capabilities: dict 

308 :param callback: Function to run on the next runnable job, should return 

309 a list of leases. 

310 :type callback: function 

311 :param timeout: time to block waiting on job queue, caps if longer 

312 than MAX_JOB_BLOCK_TIME. 

313 :type timeout: int 

314 :returns: A job 

315 

316 """ 

317 if not timeout and not self.queue: 

318 return [] 

319 

320 with self.queue_condition: 

321 leases = self._assign_lease(capabilities, callback) 

322 

323 if timeout: 

324 # Cap the timeout if it's larger than MAX_JOB_BLOCK_TIME 

325 timeout = min(timeout, MAX_JOB_BLOCK_TIME) 

326 deadline = time.time() + timeout 

327 while not leases and time.time() < deadline: 

328 ready = self.queue_condition.wait(timeout=deadline - time.time()) 

329 if not ready: 

330 # If we ran out of time waiting for the condition variable, 

331 # give up early. 

332 break 

333 leases = self._assign_lease(capabilities, callback, deadline=deadline) 

334 

335 return leases 

336 

337 def _assign_lease(self, worker_capabilities, callback, deadline=None): 

338 for index, job in enumerate(self.queue): 

339 if deadline is not None and time.time() >= deadline: 

340 break 

341 # Don't queue a cancelled job, it would be unable to get a lease anyway 

342 if job.cancelled: 

343 self.logger.debug(f"Dropping cancelled job: [{job.name}] from queue") 

344 del self.queue[index] 

345 continue 

346 

347 if self._worker_is_capable(worker_capabilities, job): 

348 leases = callback(job) 

349 if leases: 

350 del self.queue[index] 

351 return leases 

352 return [] 

353 

354 def _worker_is_capable(self, worker_capabilities, job): 

355 """Returns whether the worker is suitable to run the job.""" 

356 # TODO: Replace this with the logic defined in the Platform msg. standard. 

357 

358 job_requirements = job.platform_requirements 

359 # For now we'll only check OS and ISA properties. 

360 

361 if not job_requirements: 

362 return True 

363 

364 for req, matches in job_requirements.items(): 

365 if not matches <= worker_capabilities.get(req, set()): 

366 return False 

367 return True 

368 

369 def get_metrics(self): 

370 metrics = {} 

371 metrics[MetricCategories.JOBS.value] = { 

372 stage.value: count for stage, count in self._get_job_count_by_stage() 

373 } 

374 metrics[MetricCategories.LEASES.value] = { 

375 state.value: count for state, count in self._get_lease_count_by_state() 

376 } 

377 

378 return metrics