Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import bisect 

17import logging 

18from multiprocessing import Queue 

19from threading import Lock, Thread 

20from typing import List, Tuple 

21from datetime import datetime 

22import time 

23 

24from buildgrid._protos.google.longrunning import operations_pb2 

25from buildgrid._enums import LeaseState, MetricCategories, OperationStage 

26from buildgrid._exceptions import InvalidArgumentError 

27from buildgrid.utils import Condition, JobState, BrowserURL 

28from buildgrid.settings import MAX_JOB_BLOCK_TIME 

29from buildgrid.server.metrics_names import ( 

30 BOTS_ASSIGN_JOB_LEASES_TIME_METRIC_NAME, 

31 DATA_STORE_CHECK_FOR_UPDATE_TIME_METRIC_NAME, 

32 DATA_STORE_CREATE_JOB_TIME_METRIC_NAME, 

33 DATA_STORE_CREATE_LEASE_TIME_METRIC_NAME, 

34 DATA_STORE_CREATE_OPERATION_TIME_METRIC_NAME, 

35 DATA_STORE_GET_JOB_BY_DIGEST_TIME_METRIC_NAME, 

36 DATA_STORE_GET_JOB_BY_NAME_TIME_METRIC_NAME, 

37 DATA_STORE_GET_JOB_BY_OPERATION_TIME_METRIC_NAME, 

38 DATA_STORE_LIST_OPERATIONS_TIME_METRIC_NAME, 

39 DATA_STORE_QUEUE_JOB_TIME_METRIC_NAME, 

40 DATA_STORE_UPDATE_JOB_TIME_METRIC_NAME, 

41 DATA_STORE_UPDATE_LEASE_TIME_METRIC_NAME, 

42 DATA_STORE_UPDATE_OPERATION_TIME_METRIC_NAME 

43) 

44from buildgrid.server.metrics_utils import DurationMetric 

45from buildgrid.server.operations.filtering import OperationFilter, DEFAULT_OPERATION_FILTERS 

46from buildgrid.server.persistence.interface import DataStoreInterface 

47 

48 

49class MemoryDataStore(DataStoreInterface): 

50 

51 def __init__(self, storage): 

52 super().__init__() 

53 self.logger = logging.getLogger(__file__) 

54 self.logger.info("Creating in-memory scheduler") 

55 

56 self.queue = [] 

57 self.queue_lock = Lock() 

58 self.queue_condition = Condition(lock=self.queue_lock) 

59 

60 self.jobs_by_action = {} 

61 self.jobs_by_operation = {} 

62 self.jobs_by_name = {} 

63 

64 self.operations_by_stage = {} 

65 self.leases_by_state = {} 

66 self.is_instrumented = False 

67 

68 self.storage = storage 

69 self.update_event_queue = Queue() 

70 self.watcher = Thread(name="JobWatcher", target=self.wait_for_job_updates, daemon=True) 

71 self.watcher_keep_running = True 

72 self.watcher.start() 

73 

74 def __repr__(self): 

75 return "In-memory data store interface" 

76 

77 def activate_monitoring(self): 

78 if self.is_instrumented: 

79 return 

80 

81 self.operations_by_stage = { 

82 stage: set() for stage in OperationStage 

83 } 

84 self.leases_by_state = { 

85 state: set() for state in LeaseState 

86 } 

87 self.is_instrumented = True 

88 

89 def deactivate_monitoring(self): 

90 if not self.is_instrumented: 

91 return 

92 

93 self.operations_by_stage = {} 

94 self.leases_by_state = {} 

95 self.is_instrumented = False 

96 

97 def _check_job_timeout(self, job_internal, *, max_execution_timeout=None): 

98 """ Do a lazy check of maximum allowed job timeouts when clients try to retrieve 

99 an existing job. 

100 Cancel the job and related operations/leases, if we detect they have 

101 exceeded timeouts on access. 

102 

103 Returns the `buildgrid.server.Job` object, possibly updated with `cancelled=True`. 

104 """ 

105 if job_internal and max_execution_timeout and job_internal.worker_start_timestamp_as_datetime: 

106 if job_internal.operation_stage == OperationStage.EXECUTING: 

107 executing_duration = datetime.utcnow() - job_internal.worker_start_timestamp_as_datetime 

108 if executing_duration.total_seconds() >= max_execution_timeout: 

109 self.logger.warning(f"Job=[{job_internal}] has been executing for " 

110 f"executing_duration=[{executing_duration}]. " 

111 f"max_execution_timeout=[{max_execution_timeout}] " 

112 "Cancelling.") 

113 job_internal.cancel_all_operations(data_store=self) 

114 self.logger.info(f"Job=[{job_internal}] has been cancelled.") 

115 return job_internal 

116 

117 @DurationMetric(DATA_STORE_GET_JOB_BY_NAME_TIME_METRIC_NAME, instanced=True) 

118 def get_job_by_name(self, name, *, max_execution_timeout=None): 

119 job = self.jobs_by_name.get(name) 

120 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

121 

122 @DurationMetric(DATA_STORE_GET_JOB_BY_DIGEST_TIME_METRIC_NAME, instanced=True) 

123 def get_job_by_action(self, action_digest, *, max_execution_timeout=None): 

124 job = self.jobs_by_action.get(action_digest.hash) 

125 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

126 

127 @DurationMetric(DATA_STORE_GET_JOB_BY_OPERATION_TIME_METRIC_NAME, instanced=True) 

128 def get_job_by_operation(self, name, *, max_execution_timeout=None): 

129 job = self.jobs_by_operation.get(name) 

130 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

131 

132 def get_all_jobs(self): 

133 return [job for job in self.jobs_by_name.values() 

134 if job.operation_stage != OperationStage.COMPLETED] 

135 

136 def get_jobs_by_stage(self, operation_stage): 

137 return [job for job in self.jobs_by_name.values() 

138 if job.operation_stage == operation_stage] 

139 

140 def _get_job_count_by_stage(self): 

141 results = [] 

142 for stage in OperationStage: 

143 results.append((stage, len(self.get_jobs_by_stage(stage)))) 

144 return results 

145 

146 @DurationMetric(DATA_STORE_CREATE_JOB_TIME_METRIC_NAME, instanced=True) 

147 def create_job(self, job): 

148 self.jobs_by_action[job.action_digest.hash] = job 

149 self.jobs_by_name[job.name] = job 

150 if self._action_browser_url is not None: 

151 job.set_action_url(BrowserURL(self._action_browser_url, self._instance_name)) 

152 

153 @DurationMetric(DATA_STORE_QUEUE_JOB_TIME_METRIC_NAME, instanced=True) 

154 def queue_job(self, job_name): 

155 job = self.jobs_by_name[job_name] 

156 with self.queue_condition: 

157 if job.operation_stage != OperationStage.QUEUED: 

158 bisect.insort(self.queue, job) 

159 self.logger.info(f"Job queued: [{job.name}]") 

160 else: 

161 self.logger.info(f"Job already queued: [{job.name}]") 

162 self.queue.sort() 

163 

164 @DurationMetric(DATA_STORE_UPDATE_JOB_TIME_METRIC_NAME, instanced=True) 

165 def update_job(self, job_name, changes, skip_notify=False): 

166 # With this implementation, there's no need to actually make 

167 # changes to the stored job, since its a reference to the 

168 # in-memory job that caused this method to be called. 

169 self.update_event_queue.put((job_name, changes, skip_notify)) 

170 

171 def delete_job(self, job_name): 

172 job = self.jobs_by_name[job_name] 

173 

174 del self.jobs_by_action[job.action_digest.hash] 

175 del self.jobs_by_name[job.name] 

176 

177 self.logger.info(f"Job deleted: [{job.name}]") 

178 

179 if self.is_instrumented: 

180 for stage in OperationStage: 

181 self.operations_by_stage[stage].discard(job.name) 

182 

183 for state in LeaseState: 

184 self.leases_by_state[state].discard(job.name) 

185 

186 def wait_for_job_updates(self): 

187 self.logger.info("Starting job watcher thread") 

188 while self.watcher_keep_running: 

189 try: 

190 job_name, changes, skip_notify = self.update_event_queue.get() 

191 except EOFError: 

192 continue 

193 with DurationMetric(DATA_STORE_CHECK_FOR_UPDATE_TIME_METRIC_NAME): 

194 with self.watched_jobs_lock: 

195 if (all(field not in changes for field in ("cancelled", "stage")) or 

196 job_name not in self.watched_jobs): 

197 # If the stage or cancellation state haven't changed, we don't 

198 # need to do anything with this event. Similarly, if we aren't 

199 # watching this job, we can ignore the event. 

200 continue 

201 job = self.get_job_by_name(job_name) 

202 spec = self.watched_jobs[job_name] 

203 new_state = JobState(job) 

204 if spec.last_state != new_state and not skip_notify: 

205 spec.last_state = new_state 

206 if not skip_notify: 

207 spec.event.notify_change() 

208 

209 def store_response(self, job, commit_changes): 

210 # The job is always in memory in this implementation, so there's 

211 # no need to write anything to the CAS, since the job stays in 

212 # memory as long as we need it 

213 pass 

214 

215 def get_operations_by_stage(self, operation_stage): 

216 return self.operations_by_stage.get(operation_stage, set()) 

217 

218 def _get_operation_count_by_stage(self): 

219 results = [] 

220 for stage in OperationStage: 

221 results.append((stage, len(self.get_operations_by_stage(stage)))) 

222 return results 

223 

224 @DurationMetric(DATA_STORE_LIST_OPERATIONS_TIME_METRIC_NAME, instanced=True) 

225 def list_operations(self, 

226 operation_filters: List[OperationFilter]=None, 

227 page_size: int=None, 

228 page_token: str=None, 

229 max_execution_timeout: int=None) -> Tuple[List[operations_pb2.Operation], str]: 

230 

231 if operation_filters and operation_filters != DEFAULT_OPERATION_FILTERS: 

232 raise InvalidArgumentError("Filtering is not supported with the in-memory scheduler.") 

233 

234 if page_token: 

235 raise InvalidArgumentError("page_token is not supported in the in-memory scheduler.") 

236 

237 # Run through all the jobs and see if any of are 

238 # exceeding the execution timeout; mark those as cancelled 

239 for job in self.jobs_by_name.values(): 

240 self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

241 

242 # Return all operations 

243 return [ 

244 operation for job in self.jobs_by_name.values() for operation in job.get_all_operations() 

245 ], "" 

246 

247 @DurationMetric(DATA_STORE_CREATE_OPERATION_TIME_METRIC_NAME, instanced=True) 

248 def create_operation(self, operation_name, job_name, request_metadata=None): 

249 job = self.jobs_by_name[job_name] 

250 self.jobs_by_operation[operation_name] = job 

251 if self.is_instrumented: 

252 self.operations_by_stage[job.operation_stage].add(job_name) 

253 

254 @DurationMetric(DATA_STORE_UPDATE_OPERATION_TIME_METRIC_NAME, instanced=True) 

255 def update_operation(self, operation_name, changes): 

256 if self.is_instrumented: 

257 job = self.jobs_by_operation[operation_name] 

258 self.operations_by_stage[job.operation_stage].add(job.name) 

259 other_stages = [member for member in OperationStage if member != job.operation_stage] 

260 for stage in other_stages: 

261 self.operations_by_stage[stage].discard(job.name) 

262 

263 def delete_operation(self, operation_name): 

264 del self.jobs_by_operation[operation_name] 

265 

266 def get_leases_by_state(self, lease_state): 

267 return self.leases_by_state.get(lease_state, set()) 

268 

269 def _get_lease_count_by_state(self): 

270 results = [] 

271 for state in LeaseState: 

272 results.append((state, len(self.get_leases_by_state(state)))) 

273 return results 

274 

275 @DurationMetric(DATA_STORE_CREATE_LEASE_TIME_METRIC_NAME, instanced=True) 

276 def create_lease(self, lease): 

277 if self.is_instrumented: 

278 self.leases_by_state[LeaseState(lease.state)].add(lease.id) 

279 

280 @DurationMetric(DATA_STORE_UPDATE_LEASE_TIME_METRIC_NAME, instanced=True) 

281 def update_lease(self, job_name, changes): 

282 if self.is_instrumented: 

283 job = self.jobs_by_name[job_name] 

284 state = LeaseState(job.lease.state) 

285 self.leases_by_state[state].add(job.lease.id) 

286 other_states = [member for member in LeaseState if member != state] 

287 for state in other_states: 

288 self.leases_by_state[state].discard(job.lease.id) 

289 

290 def load_unfinished_jobs(self): 

291 return [] 

292 

293 @DurationMetric(BOTS_ASSIGN_JOB_LEASES_TIME_METRIC_NAME, instanced=True) 

294 def assign_lease_for_next_job(self, capabilities, callback, timeout=None): 

295 """Return the highest priority job that can be run by a worker. 

296 

297 Iterate over the job queue and find the highest priority job which 

298 the worker can run given the provided capabilities. Takes a 

299 dictionary of worker capabilities to compare with job requirements. 

300 

301 :param capabilities: Dictionary of worker capabilities to compare 

302 with job requirements when finding a job. 

303 :type capabilities: dict 

304 :param callback: Function to run on the next runnable job, should return 

305 a list of leases. 

306 :type callback: function 

307 :param timeout: time to block waiting on job queue, caps if longer 

308 than MAX_JOB_BLOCK_TIME. 

309 :type timeout: int 

310 :returns: A job 

311 

312 """ 

313 if not timeout and not self.queue: 

314 return [] 

315 

316 with self.queue_condition: 

317 leases = self._assign_lease(capabilities, callback) 

318 

319 self.queue_condition.notify() 

320 

321 if timeout: 

322 # Cap the timeout if it's larger than MAX_JOB_BLOCK_TIME 

323 timeout = min(timeout, MAX_JOB_BLOCK_TIME) 

324 deadline = time.time() + timeout 

325 while not leases and time.time() < deadline: 

326 ready = self.queue_condition.wait(timeout=deadline - time.time()) 

327 if not ready: 

328 # If we ran out of time waiting for the condition variable, 

329 # give up early. 

330 break 

331 leases = self._assign_lease(capabilities, callback, deadline=deadline) 

332 self.queue_condition.notify() 

333 

334 return leases 

335 

336 def _assign_lease(self, worker_capabilities, callback, deadline=None): 

337 for index, job in enumerate(self.queue): 

338 if deadline is not None and time.time() >= deadline: 

339 break 

340 # Don't queue a cancelled job, it would be unable to get a lease anyway 

341 if job.cancelled: 

342 self.logger.debug(f"Dropping cancelled job: [{job.name}] from queue") 

343 del self.queue[index] 

344 continue 

345 

346 if self._worker_is_capable(worker_capabilities, job): 

347 leases = callback(job) 

348 if leases: 

349 del self.queue[index] 

350 return leases 

351 return [] 

352 

353 def _worker_is_capable(self, worker_capabilities, job): 

354 """Returns whether the worker is suitable to run the job.""" 

355 # TODO: Replace this with the logic defined in the Platform msg. standard. 

356 

357 job_requirements = job.platform_requirements 

358 # For now we'll only check OS and ISA properties. 

359 

360 if not job_requirements: 

361 return True 

362 

363 for req, matches in job_requirements.items(): 

364 if not matches <= worker_capabilities.get(req, set()): 

365 return False 

366 return True 

367 

368 def get_metrics(self): 

369 metrics = {} 

370 metrics[MetricCategories.JOBS.value] = { 

371 stage.value: count for stage, count in self._get_job_count_by_stage() 

372 } 

373 metrics[MetricCategories.OPERATIONS.value] = { 

374 stage.value: count for stage, count in self._get_operation_count_by_stage() 

375 } 

376 metrics[MetricCategories.LEASES.value] = { 

377 state.value: count for state, count in self._get_lease_count_by_state() 

378 } 

379 

380 return metrics