Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import bisect 

17import logging 

18from multiprocessing import Queue 

19from threading import Lock, Thread 

20from typing import List 

21from datetime import datetime 

22import time 

23 

24from buildgrid._protos.google.longrunning import operations_pb2 

25from ...._enums import LeaseState, MetricCategories, OperationStage 

26from ....utils import Condition, JobState 

27from ....settings import MAX_JOB_BLOCK_TIME 

28from ..interface import DataStoreInterface 

29 

30 

31class MemoryDataStore(DataStoreInterface): 

32 

33 def __init__(self, storage): 

34 super().__init__() 

35 self.logger = logging.getLogger(__file__) 

36 self.logger.info("Creating in-memory scheduler") 

37 

38 self.queue = [] 

39 self.queue_lock = Lock() 

40 self.queue_condition = Condition(lock=self.queue_lock) 

41 

42 self.jobs_by_action = {} 

43 self.jobs_by_operation = {} 

44 self.jobs_by_name = {} 

45 

46 self.operations_by_stage = {} 

47 self.leases_by_state = {} 

48 self.is_instrumented = False 

49 

50 self.update_event_queue = Queue() 

51 self.watcher = Thread(name="JobWatcher", target=self.wait_for_job_updates, daemon=True) 

52 self.watcher_keep_running = True 

53 self.watcher.start() 

54 

55 def __repr__(self): 

56 return "In-memory data store interface" 

57 

58 def activate_monitoring(self): 

59 if self.is_instrumented: 

60 return 

61 

62 self.operations_by_stage = { 

63 stage: set() for stage in OperationStage 

64 } 

65 self.leases_by_state = { 

66 state: set() for state in LeaseState 

67 } 

68 self.is_instrumented = True 

69 

70 def deactivate_monitoring(self): 

71 if not self.is_instrumented: 

72 return 

73 

74 self.operations_by_stage = {} 

75 self.leases_by_state = {} 

76 self.is_instrumented = False 

77 

78 def _check_job_timeout(self, job_internal, *, max_execution_timeout=None): 

79 """ Do a lazy check of maximum allowed job timeouts when clients try to retrieve 

80 an existing job. 

81 Cancel the job and related operations/leases, if we detect they have 

82 exceeded timeouts on access. 

83 

84 Returns the `buildgrid.server.Job` object, possibly updated with `cancelled=True`. 

85 """ 

86 if job_internal and max_execution_timeout and job_internal.worker_start_timestamp_as_datetime: 

87 if job_internal.operation_stage == OperationStage.EXECUTING: 

88 executing_duration = datetime.utcnow() - job_internal.worker_start_timestamp_as_datetime 

89 if executing_duration.total_seconds() >= max_execution_timeout: 

90 self.logger.warning(f"Job=[{job_internal}] has been executing for " 

91 f"executing_duration=[{executing_duration}]. " 

92 f"max_execution_timeout=[{max_execution_timeout}] " 

93 "Cancelling.") 

94 job_internal.cancel_all_operations(data_store=self) 

95 self.logger.info(f"Job=[{job_internal}] has been cancelled.") 

96 return job_internal 

97 

98 def get_job_by_name(self, name, *, max_execution_timeout=None): 

99 job = self.jobs_by_name.get(name) 

100 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

101 

102 def get_job_by_action(self, action_digest, *, max_execution_timeout=None): 

103 job = self.jobs_by_action.get(action_digest.hash) 

104 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

105 

106 def get_job_by_operation(self, name, *, max_execution_timeout=None): 

107 job = self.jobs_by_operation.get(name) 

108 return self._check_job_timeout(job, max_execution_timeout=max_execution_timeout) 

109 

110 def get_all_jobs(self): 

111 return [job for job in self.jobs_by_name.values() 

112 if job.operation_stage != OperationStage.COMPLETED] 

113 

114 def get_jobs_by_stage(self, operation_stage): 

115 return [job for job in self.jobs_by_name.values() 

116 if job.operation_stage == operation_stage] 

117 

118 def _get_job_count_by_stage(self): 

119 results = [] 

120 for stage in OperationStage: 

121 results.append((stage, len(self.get_jobs_by_stage(stage)))) 

122 return results 

123 

124 def create_job(self, job): 

125 self.jobs_by_action[job.action_digest.hash] = job 

126 self.jobs_by_name[job.name] = job 

127 

128 def queue_job(self, job_name): 

129 job = self.jobs_by_name[job_name] 

130 with self.queue_condition: 

131 if job.operation_stage != OperationStage.QUEUED: 

132 bisect.insort(self.queue, job) 

133 self.logger.info(f"Job queued: [{job.name}]") 

134 else: 

135 self.logger.info(f"Job already queued: [{job.name}]") 

136 self.queue.sort() 

137 

138 def update_job(self, job_name, changes): 

139 # With this implementation, there's no need to actually make 

140 # changes to the stored job, since its a reference to the 

141 # in-memory job that caused this method to be called. 

142 self.update_event_queue.put((job_name, changes)) 

143 

144 def delete_job(self, job_name): 

145 job = self.jobs_by_name[job_name] 

146 

147 del self.jobs_by_action[job.action_digest.hash] 

148 del self.jobs_by_name[job.name] 

149 

150 self.logger.info(f"Job deleted: [{job.name}]") 

151 

152 if self.is_instrumented: 

153 for stage in OperationStage: 

154 self.operations_by_stage[stage].discard(job.name) 

155 

156 for state in LeaseState: 

157 self.leases_by_state[state].discard(job.name) 

158 

159 def wait_for_job_updates(self): 

160 self.logger.info("Starting job watcher thread") 

161 while self.watcher_keep_running: 

162 try: 

163 job_name, changes = self.update_event_queue.get() 

164 except EOFError: 

165 continue 

166 with self.watched_jobs_lock: 

167 if (all(field not in changes for field in ("cancelled", "stage")) or 

168 job_name not in self.watched_jobs): 

169 # If the stage or cancellation state haven't changed, we don't 

170 # need to do anything with this event. Similarly, if we aren't 

171 # watching this job, we can ignore the event. 

172 continue 

173 job = self.get_job_by_name(job_name) 

174 spec = self.watched_jobs[job_name] 

175 new_state = JobState(job) 

176 if spec.last_state != new_state: 

177 spec.last_state = new_state 

178 spec.event.notify_change() 

179 

180 def store_response(self, job): 

181 # The job is always in memory in this implementation, so there's 

182 # no need to write anything to the CAS, since the job stays in 

183 # memory as long as we need it 

184 pass 

185 

186 def get_operations_by_stage(self, operation_stage): 

187 return self.operations_by_stage.get(operation_stage, set()) 

188 

189 def _get_operation_count_by_stage(self): 

190 results = [] 

191 for stage in OperationStage: 

192 results.append((stage, len(self.get_operations_by_stage(stage)))) 

193 return results 

194 

195 def get_all_operations(self) -> List[operations_pb2.Operation]: 

196 return [ 

197 operation for job in self.jobs_by_name.values() for operation in job.get_all_operations() 

198 ] 

199 

200 def create_operation(self, operation, job_name): 

201 job = self.jobs_by_name[job_name] 

202 self.jobs_by_operation[operation.name] = job 

203 if self.is_instrumented: 

204 self.operations_by_stage[job.operation_stage].add(job_name) 

205 

206 def update_operation(self, operation_name, changes): 

207 if self.is_instrumented: 

208 job = self.jobs_by_operation[operation_name] 

209 self.operations_by_stage[job.operation_stage].add(job.name) 

210 other_stages = [member for member in OperationStage if member != job.operation_stage] 

211 for stage in other_stages: 

212 self.operations_by_stage[stage].discard(job.name) 

213 

214 def delete_operation(self, operation_name): 

215 del self.jobs_by_operation[operation_name] 

216 

217 def get_leases_by_state(self, lease_state): 

218 return self.leases_by_state.get(lease_state, set()) 

219 

220 def _get_lease_count_by_state(self): 

221 results = [] 

222 for state in LeaseState: 

223 results.append((state, len(self.get_leases_by_state(state)))) 

224 return results 

225 

226 def create_lease(self, lease): 

227 if self.is_instrumented: 

228 self.leases_by_state[LeaseState(lease.state)].add(lease.id) 

229 

230 def update_lease(self, job_name, changes): 

231 if self.is_instrumented: 

232 job = self.jobs_by_name[job_name] 

233 state = LeaseState(job.lease.state) 

234 self.leases_by_state[state].add(job.lease.id) 

235 other_states = [member for member in LeaseState if member != state] 

236 for state in other_states: 

237 self.leases_by_state[state].discard(job.lease.id) 

238 

239 def load_unfinished_jobs(self): 

240 return [] 

241 

242 def assign_lease_for_next_job(self, capabilities, callback, timeout=None): 

243 """Return the highest priority job that can be run by a worker. 

244 

245 Iterate over the job queue and find the highest priority job which 

246 the worker can run given the provided capabilities. Takes a 

247 dictionary of worker capabilities to compare with job requirements. 

248 

249 :param capabilities: Dictionary of worker capabilities to compare 

250 with job requirements when finding a job. 

251 :type capabilities: dict 

252 :param callback: Function to run on the next runnable job, should return 

253 a list of leases. 

254 :type callback: function 

255 :param timeout: time to block waiting on job queue, caps if longer 

256 than MAX_JOB_BLOCK_TIME. 

257 :type timeout: int 

258 :returns: A job 

259 

260 """ 

261 if not timeout and not self.queue: 

262 return [] 

263 

264 with self.queue_condition: 

265 leases = self._assign_lease(capabilities, callback) 

266 

267 self.queue_condition.notify() 

268 

269 if timeout: 

270 # Cap the timeout if it's larger than MAX_JOB_BLOCK_TIME 

271 timeout = min(timeout, MAX_JOB_BLOCK_TIME) 

272 deadline = time.time() + timeout 

273 while not leases and time.time() < deadline: 

274 ready = self.queue_condition.wait(timeout=deadline - time.time()) 

275 if not ready: 

276 # If we ran out of time waiting for the condition variable, 

277 # give up early. 

278 break 

279 leases = self._assign_lease(capabilities, callback, deadline=deadline) 

280 self.queue_condition.notify() 

281 

282 return leases 

283 

284 def _assign_lease(self, worker_capabilities, callback, deadline=None): 

285 for index, job in enumerate(self.queue): 

286 if deadline is not None and time.time() >= deadline: 

287 break 

288 # Don't queue a cancelled job, it would be unable to get a lease anyway 

289 if job.cancelled: 

290 self.logger.debug(f"Dropping cancelled job: [{job.name}] from queue") 

291 del self.queue[index] 

292 continue 

293 

294 if self._worker_is_capable(worker_capabilities, job): 

295 leases = callback(job) 

296 if leases: 

297 del self.queue[index] 

298 return leases 

299 return [] 

300 

301 def _worker_is_capable(self, worker_capabilities, job): 

302 """Returns whether the worker is suitable to run the job.""" 

303 # TODO: Replace this with the logic defined in the Platform msg. standard. 

304 

305 job_requirements = job.platform_requirements 

306 # For now we'll only check OS and ISA properties. 

307 

308 if not job_requirements: 

309 return True 

310 

311 for req, matches in job_requirements.items(): 

312 if not matches <= worker_capabilities.get(req, set()): 

313 return False 

314 return True 

315 

316 def get_metrics(self): 

317 metrics = {} 

318 metrics[MetricCategories.JOBS.value] = { 

319 stage.value: count for stage, count in self._get_job_count_by_stage() 

320 } 

321 metrics[MetricCategories.OPERATIONS.value] = { 

322 stage.value: count for stage, count in self._get_operation_count_by_stage() 

323 } 

324 metrics[MetricCategories.LEASES.value] = { 

325 state.value: count for state, count in self._get_lease_count_by_state() 

326 } 

327 

328 return metrics