Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/mem/impl.py: 80.09%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

221 statements  

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import bisect 

17import logging 

18from multiprocessing import Queue 

19from threading import Lock, Thread 

20from typing import List, Tuple 

21from datetime import datetime 

22import time 

23 

24from buildgrid._protos.google.longrunning import operations_pb2 

25from buildgrid._enums import LeaseState, MetricCategories, OperationStage 

26from buildgrid._exceptions import InvalidArgumentError 

27from buildgrid.utils import Condition, JobState, BrowserURL 

28from buildgrid.settings import MAX_JOB_BLOCK_TIME 

29from buildgrid.server.metrics_names import ( 

30 BOTS_ASSIGN_JOB_LEASES_TIME_METRIC_NAME, 

31 DATA_STORE_CHECK_FOR_UPDATE_TIME_METRIC_NAME, 

32 DATA_STORE_CREATE_JOB_TIME_METRIC_NAME, 

33 DATA_STORE_CREATE_LEASE_TIME_METRIC_NAME, 

34 DATA_STORE_CREATE_OPERATION_TIME_METRIC_NAME, 

35 DATA_STORE_GET_JOB_BY_DIGEST_TIME_METRIC_NAME, 

36 DATA_STORE_GET_JOB_BY_NAME_TIME_METRIC_NAME, 

37 DATA_STORE_GET_JOB_BY_OPERATION_TIME_METRIC_NAME, 

38 DATA_STORE_LIST_OPERATIONS_TIME_METRIC_NAME, 

39 DATA_STORE_QUEUE_JOB_TIME_METRIC_NAME, 

40 DATA_STORE_UPDATE_JOB_TIME_METRIC_NAME, 

41 DATA_STORE_UPDATE_LEASE_TIME_METRIC_NAME, 

42 DATA_STORE_UPDATE_OPERATION_TIME_METRIC_NAME 

43) 

44from buildgrid.server.metrics_utils import DurationMetric 

45from buildgrid.server.operations.filtering import OperationFilter, DEFAULT_OPERATION_FILTERS 

46from buildgrid.server.persistence.interface import DataStoreInterface 

47 

48 

49class MemoryDataStore(DataStoreInterface): 

50 

51 def __init__(self, storage): 

52 super().__init__(storage) 

53 self.logger = logging.getLogger(__file__) 

54 self.logger.info("Creating in-memory scheduler") 

55 

56 self.queue = [] 

57 self.queue_lock = Lock() 

58 self.queue_condition = Condition(lock=self.queue_lock) 

59 

60 self.jobs_by_action = {} 

61 self.jobs_by_operation = {} 

62 self.jobs_by_name = {} 

63 

64 self.operations_by_stage = {} 

65 self.leases_by_state = {} 

66 self.is_instrumented = False 

67 

68 self.update_event_queue = Queue() 

69 self.watcher = Thread(name="JobWatcher", target=self.wait_for_job_updates, daemon=True) 

70 self.watcher_keep_running = True 

71 self.watcher.start() 

72 

73 def __repr__(self): 

74 return "In-memory data store interface" 

75 

76 def activate_monitoring(self): 

77 if self.is_instrumented: 

78 return 

79 

80 self.operations_by_stage = { 

81 stage: set() for stage in OperationStage 

82 } 

83 self.leases_by_state = { 

84 state: set() for state in LeaseState 

85 } 

86 self.is_instrumented = True 

87 

88 def deactivate_monitoring(self): 

89 if not self.is_instrumented: 

90 return 

91 

92 self.operations_by_stage = {} 

93 self.leases_by_state = {} 

94 self.is_instrumented = False 

95 

96 def _check_job_timeout(self, job_internal, *, max_execution_timeout=None): 

97 """ Do a lazy check of maximum allowed job timeouts when clients try to retrieve 

98 an existing job. 

99 Cancel the job and related operations/leases, if we detect they have 

100 exceeded timeouts on access. 

101 

102 Returns the `buildgrid.server.Job` object, possibly updated with `cancelled=True`. 

103 """ 

104 if job_internal and max_execution_timeout and job_internal.worker_start_timestamp_as_datetime: 

105 if job_internal.operation_stage == OperationStage.EXECUTING: 

106 executing_duration = datetime.utcnow() - job_internal.worker_start_timestamp_as_datetime 

107 if executing_duration.total_seconds() >= max_execution_timeout: 

108 self.logger.warning(f"Job=[{job_internal}] has been executing for " 

109 f"executing_duration=[{executing_duration}]. " 

110 f"max_execution_timeout=[{max_execution_timeout}] " 

111 "Cancelling.") 

112 job_internal.cancel_all_operations(data_store=self) 

113 self.logger.info(f"Job=[{job_internal}] has been cancelled.") 

114 return job_internal 

115 

116 @DurationMetric(DATA_STORE_GET_JOB_BY_NAME_TIME_METRIC_NAME, instanced=True) 

117 def get_job_by_name(self, name, *, max_execution_timeout=None): 

118 job = self.jobs_by_name.get(name) 

119 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

120 

121 @DurationMetric(DATA_STORE_GET_JOB_BY_DIGEST_TIME_METRIC_NAME, instanced=True) 

122 def get_job_by_action(self, action_digest, *, max_execution_timeout=None): 

123 job = self.jobs_by_action.get(action_digest.hash) 

124 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

125 

126 @DurationMetric(DATA_STORE_GET_JOB_BY_OPERATION_TIME_METRIC_NAME, instanced=True) 

127 def get_job_by_operation(self, operation_name, *, max_execution_timeout=None): 

128 job = self.jobs_by_operation.get(operation_name) 

129 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

130 

131 def get_all_jobs(self): 

132 return [job for job in self.jobs_by_name.values() 

133 if job.operation_stage != OperationStage.COMPLETED] 

134 

135 def get_jobs_by_stage(self, operation_stage): 

136 return [job for job in self.jobs_by_name.values() 

137 if job.operation_stage == operation_stage] 

138 

139 def _get_job_count_by_stage(self): 

140 results = [] 

141 for stage in OperationStage: 

142 results.append((stage, len(self.get_jobs_by_stage(stage)))) 

143 return results 

144 

145 @DurationMetric(DATA_STORE_CREATE_JOB_TIME_METRIC_NAME, instanced=True) 

146 def create_job(self, job): 

147 self.jobs_by_action[job.action_digest.hash] = job 

148 self.jobs_by_name[job.name] = job 

149 if self._action_browser_url is not None: 

150 job.set_action_url(BrowserURL(self._action_browser_url, self._instance_name)) 

151 

152 @DurationMetric(DATA_STORE_QUEUE_JOB_TIME_METRIC_NAME, instanced=True) 

153 def queue_job(self, job_name): 

154 job = self.jobs_by_name[job_name] 

155 with self.queue_condition: 

156 if job.operation_stage != OperationStage.QUEUED: 

157 bisect.insort(self.queue, job) 

158 self.logger.info(f"Job queued: [{job.name}]") 

159 else: 

160 self.logger.info(f"Job already queued: [{job.name}]") 

161 self.queue.sort() 

162 

163 @DurationMetric(DATA_STORE_UPDATE_JOB_TIME_METRIC_NAME, instanced=True) 

164 def update_job(self, job_name, changes, skip_notify=False): 

165 # With this implementation, there's no need to actually make 

166 # changes to the stored job, since its a reference to the 

167 # in-memory job that caused this method to be called. 

168 self.update_event_queue.put((job_name, changes, skip_notify)) 

169 

170 def delete_job(self, job_name): 

171 job = self.jobs_by_name[job_name] 

172 

173 del self.jobs_by_action[job.action_digest.hash] 

174 del self.jobs_by_name[job.name] 

175 

176 self.logger.info(f"Job deleted: [{job.name}]") 

177 

178 if self.is_instrumented: 

179 for stage in OperationStage: 

180 self.operations_by_stage[stage].discard(job.name) 

181 

182 for state in LeaseState: 

183 self.leases_by_state[state].discard(job.name) 

184 

185 def wait_for_job_updates(self): 

186 self.logger.info("Starting job watcher thread") 

187 while self.watcher_keep_running: 

188 try: 

189 job_name, changes, skip_notify = self.update_event_queue.get() 

190 except EOFError: 

191 continue 

192 with DurationMetric(DATA_STORE_CHECK_FOR_UPDATE_TIME_METRIC_NAME): 

193 with self.watched_jobs_lock: 

194 if (all(field not in changes for field in ("cancelled", "stage")) or 

195 job_name not in self.watched_jobs): 

196 # If the stage or cancellation state haven't changed, we don't 

197 # need to do anything with this event. Similarly, if we aren't 

198 # watching this job, we can ignore the event. 

199 continue 

200 job = self.get_job_by_name(job_name) 

201 spec = self.watched_jobs[job_name] 

202 new_state = JobState(job) 

203 if spec.last_state != new_state and not skip_notify: 

204 spec.last_state = new_state 

205 if not skip_notify: 

206 spec.event.notify_change() 

207 

208 def store_response(self, job, commit_changes): 

209 # The job is always in memory in this implementation, so there's 

210 # no need to write anything to the CAS, since the job stays in 

211 # memory as long as we need it 

212 pass 

213 

214 def get_operations_by_stage(self, operation_stage): 

215 return self.operations_by_stage.get(operation_stage, set()) 

216 

217 def _get_operation_count_by_stage(self): 

218 results = [] 

219 for stage in OperationStage: 

220 results.append((stage, len(self.get_operations_by_stage(stage)))) 

221 return results 

222 

223 @DurationMetric(DATA_STORE_LIST_OPERATIONS_TIME_METRIC_NAME, instanced=True) 

224 def list_operations(self, 

225 operation_filters: List[OperationFilter]=None, 

226 page_size: int=None, 

227 page_token: str=None, 

228 max_execution_timeout: int=None) -> Tuple[List[operations_pb2.Operation], str]: 

229 

230 if operation_filters and operation_filters != DEFAULT_OPERATION_FILTERS: 

231 raise InvalidArgumentError("Filtering is not supported with the in-memory scheduler.") 

232 

233 if page_token: 

234 raise InvalidArgumentError("page_token is not supported in the in-memory scheduler.") 

235 

236 # Run through all the jobs and see if any of are 

237 # exceeding the execution timeout; mark those as cancelled 

238 for job in self.jobs_by_name.values(): 

239 self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

240 

241 # Return all operations 

242 return [ 

243 operation for job in self.jobs_by_name.values() for operation in job.get_all_operations() 

244 ], "" 

245 

246 @DurationMetric(DATA_STORE_CREATE_OPERATION_TIME_METRIC_NAME, instanced=True) 

247 def create_operation(self, operation_name, job_name, request_metadata=None): 

248 job = self.jobs_by_name[job_name] 

249 self.jobs_by_operation[operation_name] = job 

250 if self.is_instrumented: 

251 self.operations_by_stage[job.operation_stage].add(job_name) 

252 

253 @DurationMetric(DATA_STORE_UPDATE_OPERATION_TIME_METRIC_NAME, instanced=True) 

254 def update_operation(self, operation_name, changes): 

255 if self.is_instrumented: 

256 job = self.jobs_by_operation[operation_name] 

257 self.operations_by_stage[job.operation_stage].add(job.name) 

258 other_stages = [member for member in OperationStage if member != job.operation_stage] 

259 for stage in other_stages: 

260 self.operations_by_stage[stage].discard(job.name) 

261 

262 def delete_operation(self, operation_name): 

263 del self.jobs_by_operation[operation_name] 

264 

265 def get_leases_by_state(self, lease_state): 

266 return self.leases_by_state.get(lease_state, set()) 

267 

268 def _get_lease_count_by_state(self): 

269 results = [] 

270 for state in LeaseState: 

271 results.append((state, len(self.get_leases_by_state(state)))) 

272 return results 

273 

274 @DurationMetric(DATA_STORE_CREATE_LEASE_TIME_METRIC_NAME, instanced=True) 

275 def create_lease(self, lease): 

276 if self.is_instrumented: 

277 self.leases_by_state[LeaseState(lease.state)].add(lease.id) 

278 

279 @DurationMetric(DATA_STORE_UPDATE_LEASE_TIME_METRIC_NAME, instanced=True) 

280 def update_lease(self, job_name, changes): 

281 if self.is_instrumented: 

282 job = self.jobs_by_name[job_name] 

283 state = LeaseState(job.lease.state) 

284 self.leases_by_state[state].add(job.lease.id) 

285 other_states = [member for member in LeaseState if member != state] 

286 for state in other_states: 

287 self.leases_by_state[state].discard(job.lease.id) 

288 

289 def load_unfinished_jobs(self): 

290 return [] 

291 

292 def get_operation_request_metadata_by_name(self, operation_name): 

293 return None 

294 

295 @DurationMetric(BOTS_ASSIGN_JOB_LEASES_TIME_METRIC_NAME, instanced=True) 

296 def assign_lease_for_next_job(self, capabilities, callback, timeout=None): 

297 """Return the highest priority job that can be run by a worker. 

298 

299 Iterate over the job queue and find the highest priority job which 

300 the worker can run given the provided capabilities. Takes a 

301 dictionary of worker capabilities to compare with job requirements. 

302 

303 :param capabilities: Dictionary of worker capabilities to compare 

304 with job requirements when finding a job. 

305 :type capabilities: dict 

306 :param callback: Function to run on the next runnable job, should return 

307 a list of leases. 

308 :type callback: function 

309 :param timeout: time to block waiting on job queue, caps if longer 

310 than MAX_JOB_BLOCK_TIME. 

311 :type timeout: int 

312 :returns: A job 

313 

314 """ 

315 if not timeout and not self.queue: 

316 return [] 

317 

318 with self.queue_condition: 

319 leases = self._assign_lease(capabilities, callback) 

320 

321 self.queue_condition.notify() 

322 

323 if timeout: 

324 # Cap the timeout if it's larger than MAX_JOB_BLOCK_TIME 

325 timeout = min(timeout, MAX_JOB_BLOCK_TIME) 

326 deadline = time.time() + timeout 

327 while not leases and time.time() < deadline: 

328 ready = self.queue_condition.wait(timeout=deadline - time.time()) 

329 if not ready: 

330 # If we ran out of time waiting for the condition variable, 

331 # give up early. 

332 break 

333 leases = self._assign_lease(capabilities, callback, deadline=deadline) 

334 self.queue_condition.notify() 

335 

336 return leases 

337 

338 def _assign_lease(self, worker_capabilities, callback, deadline=None): 

339 for index, job in enumerate(self.queue): 

340 if deadline is not None and time.time() >= deadline: 

341 break 

342 # Don't queue a cancelled job, it would be unable to get a lease anyway 

343 if job.cancelled: 

344 self.logger.debug(f"Dropping cancelled job: [{job.name}] from queue") 

345 del self.queue[index] 

346 continue 

347 

348 if self._worker_is_capable(worker_capabilities, job): 

349 leases = callback(job) 

350 if leases: 

351 del self.queue[index] 

352 return leases 

353 return [] 

354 

355 def _worker_is_capable(self, worker_capabilities, job): 

356 """Returns whether the worker is suitable to run the job.""" 

357 # TODO: Replace this with the logic defined in the Platform msg. standard. 

358 

359 job_requirements = job.platform_requirements 

360 # For now we'll only check OS and ISA properties. 

361 

362 if not job_requirements: 

363 return True 

364 

365 for req, matches in job_requirements.items(): 

366 if not matches <= worker_capabilities.get(req, set()): 

367 return False 

368 return True 

369 

370 def get_metrics(self): 

371 metrics = {} 

372 metrics[MetricCategories.JOBS.value] = { 

373 stage.value: count for stage, count in self._get_job_count_by_stage() 

374 } 

375 metrics[MetricCategories.LEASES.value] = { 

376 state.value: count for state, count in self._get_lease_count_by_state() 

377 } 

378 

379 return metrics