Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/rabbitmq/operations/instance.py: 57.14%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

126 statements  

1# Copyright (C) 2021 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17OperationsInstance 

18================== 

19An instance of the LongRunningOperations Service. 

20""" 

21 

22from collections import OrderedDict 

23import logging 

24from threading import Lock 

25from typing import Optional 

26 

27from google.protobuf import timestamp_pb2 

28import pika # type: ignore 

29 

30from buildgrid._enums import ExchangeNames, QueueNames 

31from buildgrid._exceptions import InvalidArgumentError 

32from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

33from buildgrid._protos.buildgrid.v2.messaging_pb2 import CreateOperation, UpdateOperations 

34from buildgrid._protos.google.longrunning import operations_pb2 

35from buildgrid.server.metrics_names import ( 

36 OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME, 

37 OPERATIONS_GET_OPERATION_TIME_METRIC_NAME, 

38 OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME 

39) 

40from buildgrid.server.metrics_utils import DurationMetric 

41from buildgrid.server.operations.filtering import FilterParser, DEFAULT_OPERATION_FILTERS 

42from buildgrid.server.persistence.sql.impl import DataStoreInterface 

43from buildgrid.server.rabbitmq.pika_consumer import RetryingPikaConsumer, QueueBinding 

44from buildgrid.settings import DEFAULT_MAX_LIST_OPERATION_PAGE_SIZE 

45 

46 

47class OperationState: 

48 def __init__(self, 

49 operation: Optional[operations_pb2.Operation], 

50 timestamp: timestamp_pb2.Timestamp, 

51 request_metadata: Optional[remote_execution_pb2.RequestMetadata] = None): 

52 

53 self.operation: Optional[operations_pb2.Operation] = operation 

54 self.timestamp: timestamp_pb2.Timestamp = timestamp 

55 self.request_metadata: Optional[remote_execution_pb2.RequestMetadata] = request_metadata 

56 

57 

58class OperationStateCache: 

59 def __init__(self, capacity: int): 

60 """Simple in-memory LRU storage container. 

61 

62 Args: 

63 capacity (int): the maximum number of entries that can be cached. 

64 After exceeding the capacity, older entries will be dropped 

65 to make room for new ones, following a Least Recently Used 

66 policy. 

67 """ 

68 if capacity <= 0: 

69 raise ValueError("Capacity must be positive") 

70 

71 self._capacity = capacity 

72 

73 self._ordered_dict_lock = Lock() 

74 self._ordered_dict = OrderedDict() # type: ignore 

75 

76 @property 

77 def size(self) -> int: 

78 """Number of elements in the cache.""" 

79 with self._ordered_dict_lock: 

80 return len(self._ordered_dict) 

81 

82 @property 

83 def capacity(self) -> int: 

84 """Maximum number of elements that fit in the cache.""" 

85 return self._capacity 

86 

87 def update(self, 

88 operation_name: str, 

89 operation: Optional[operations_pb2.Operation], 

90 timestamp: timestamp_pb2.Timestamp, 

91 request_metadata: remote_execution_pb2.RequestMetadata=None): 

92 """Create or update a cache entry for the given operation. 

93 If no `request_metadata` is given for a `job_name` that is 

94 present, it will keep the value of `request_metadata` previously 

95 written. 

96 """ 

97 with self._ordered_dict_lock: 

98 if operation_name in self._ordered_dict: 

99 # Existing operation, leave `RequestMetadata` untouched: 

100 self._ordered_dict[operation_name].operation = operation 

101 self._ordered_dict[operation_name].timestamp = timestamp 

102 

103 self._ordered_dict.move_to_end(operation_name, last=True) 

104 else: 

105 self._ordered_dict[operation_name] = OperationState(operation=operation, 

106 timestamp=timestamp, 

107 request_metadata=request_metadata) 

108 

109 if len(self._ordered_dict) > self._capacity: 

110 self._ordered_dict.popitem(last=False) 

111 

112 def get(self, operation_name: str) -> Optional[OperationState]: 

113 """Get a value defined in the cache. If a value for the given key is 

114 not found, returns None. 

115 Updates the last access time of the entry. 

116 """ 

117 with self._ordered_dict_lock: 

118 state = self._ordered_dict.get(operation_name, None) 

119 if state is not None: 

120 self._ordered_dict.move_to_end(operation_name, last=True) 

121 return state 

122 

123 

124class OperationsInstance: 

125 

126 def __init__(self, 

127 instance_name: str, 

128 rabbitmq_connection_parameters: pika.connection.Parameters, 

129 operations_datastore: DataStoreInterface, 

130 operation_state_cache_capacity: int = 1000, 

131 max_connection_attempts: int = 0, 

132 max_list_operations_page_size=DEFAULT_MAX_LIST_OPERATION_PAGE_SIZE): 

133 """Instantiate a new ``OperationsInstance``. 

134 

135 Args: 

136 instance_name (str): name of the instance 

137 rabbitmq_connection_parameters (pika.connection.Parameters): 

138 connection details of the RabbitMQ server from which to read 

139 updates 

140 operations_datastore (DataStoreInterface): underlying storage 

141 for Operations 

142 operation_state_cache_capacity (int): maximum number of Operations 

143 that can be cached in memory 

144 max_connection_attempts (int): maximum connection attempts to 

145 the RabbitMQ server (default: 0 = no limit) 

146 max_list_operations_page_size (int): size limit for pages returned 

147 by ``ListOperations()`` 

148 """ 

149 self._logger = logging.getLogger(__name__) 

150 

151 self._operations_datastore = operations_datastore 

152 

153 self._max_list_operations_page_size = max_list_operations_page_size 

154 

155 self._operation_state_cache = OperationStateCache(capacity=operation_state_cache_capacity) 

156 

157 self._instance_name = instance_name 

158 self._instance_is_registered = False 

159 

160 self._stopped = False 

161 

162 queue = QueueNames.OPERATION_UPDATES.value 

163 exchange = ExchangeNames.OPERATION_UPDATES.value 

164 binding_key = f'*.{self._instance_name}' # "<state>.<instance_name>" 

165 

166 self._logger.debug(f"Consuming messages from queue '{queue}', " 

167 f"exchange '{exchange}', and routing key '{binding_key}'") 

168 queue_binding = QueueBinding(queue=queue, exchange=exchange, routing_key=binding_key) 

169 self._rabbitmq_consumer = RetryingPikaConsumer(connection_parameters=rabbitmq_connection_parameters, 

170 exchanges={ExchangeNames.OPERATION_UPDATES.value: 

171 pika.exchange_type.ExchangeType.topic}, 

172 bindings={queue_binding}, 

173 max_connection_attempts=max_connection_attempts) 

174 self._rabbitmq_consumer.subscribe(queue_name=QueueNames.OPERATION_UPDATES.value, 

175 callback=self._process_operation_update) 

176 

177 def __del__(self): 

178 self.stop() 

179 

180 # --- Public API --- 

181 @property 

182 def instance_name(self): 

183 return self._instance_name 

184 

185 def register_instance_with_server(self, server): 

186 """Names and registers the operations instance with a given server.""" 

187 if self._instance_is_registered: 

188 raise AssertionError("Instance already registered") 

189 

190 server.add_operations_instance(self, self._instance_name) 

191 self._instance_is_registered = True 

192 

193 @DurationMetric(OPERATIONS_GET_OPERATION_TIME_METRIC_NAME, instanced=True) 

194 def get_operation(self, job_name): 

195 # Local cache: 

196 operation_state = self._operation_state_cache.get(job_name) 

197 if operation_state: 

198 return operation_state.operation, operation_state.request_metadata 

199 

200 # Centralized datastore: 

201 operation = self._operations_datastore.get_job_by_operation(job_name) 

202 if not operation: 

203 raise InvalidArgumentError(f"Operation name does not exist: [{job_name}]") 

204 metadata = self._operations_datastore.get_operation_request_metadata_by_name(job_name) 

205 return operation, metadata 

206 

207 @DurationMetric(OPERATIONS_LIST_OPERATIONS_TIME_METRIC_NAME, instanced=True) 

208 def list_operations(self, filter_string, page_size, page_token): 

209 if page_size and page_size > self._max_list_operations_page_size: 

210 raise InvalidArgumentError(f"The maximum page size is " 

211 f"{self._max_list_operations_page_size}.") 

212 if not page_size: 

213 page_size = self._max_list_operations_page_size 

214 

215 operation_filters = FilterParser.parse_listoperations_filters(filter_string) 

216 if not operation_filters: 

217 operation_filters = DEFAULT_OPERATION_FILTERS 

218 

219 response = operations_pb2.ListOperationsResponse() 

220 

221 results, next_token = self._operations_datastore.list_operations(operation_filters, 

222 page_size, 

223 page_token) 

224 response.operations.extend(results) 

225 response.next_page_token = next_token 

226 

227 return response 

228 

229 def delete_operation(self, job_name): 

230 """ DeleteOperation is not supported in BuildGrid. """ 

231 pass 

232 

233 @DurationMetric(OPERATIONS_CANCEL_OPERATION_TIME_METRIC_NAME, instanced=True) 

234 def cancel_operation(self, job_name): 

235 pass 

236 

237 def stop(self): 

238 if not self._stopped: 

239 self._rabbitmq_consumer.stop() 

240 

241 # --- Private API --- 

242 def _process_operation_update(self, message: bytes, delivery_tag: str): 

243 # Checking whether the messages is Create/UpdateOperation. 

244 

245 create_operation = CreateOperation() 

246 create_operation.ParseFromString(message) 

247 

248 request_metadata = None 

249 operation = None 

250 

251 if create_operation.action_digest.size_bytes > 0: 

252 self._logger.debug(f"Received CreateOperation message " 

253 f"[{create_operation}] (delivery tag: {delivery_tag})") 

254 operation_name = create_operation.job_id 

255 message_creation_timestamp = create_operation.timestamp 

256 request_metadata = create_operation.request_metadata 

257 else: 

258 update_operations = UpdateOperations() 

259 update_operations.ParseFromString(message) 

260 

261 operation_name = update_operations.job_id 

262 if operation_name: 

263 self._logger.debug(f"Received UpdateOperations message " 

264 f"[{update_operations}] (delivery tag: {delivery_tag})") 

265 operation = update_operations.operation_state 

266 message_creation_timestamp = update_operations.timestamp 

267 

268 if operation_name: 

269 # Updating in-memory cache: 

270 self._operation_state_cache.update(operation_name=operation_name, 

271 operation=operation, 

272 timestamp=message_creation_timestamp, 

273 request_metadata=request_metadata) 

274 # TODO: update `self._operations_datastore` as well 

275 

276 self._rabbitmq_consumer.ack_message(delivery_tag)