Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15# Disable unwanted pylint rules 

16# pylint: disable=anomalous-backslash-in-string,broad-except 

17 

18 

19import asyncio 

20import logging 

21from functools import partial 

22 

23import grpc 

24 

25from buildgrid._enums import LeaseState 

26from buildgrid._protos.google.rpc import code_pb2 

27from buildgrid._protos.google.rpc import status_pb2 

28 

29from .tenant import Tenant 

30 

31 

32class TenantManager: 

33 """Manages a number of :class:`Tenant`\ s. 

34 

35 Creates work to do, monitors and removes leases of work. 

36 """ 

37 

38 def __init__(self): 

39 """Initialises an instance of the :class:`TenantManager`.""" 

40 self.__logger = logging.getLogger(__name__) 

41 self._tenants = {} 

42 self._tasks = {} 

43 

44 def create_tenancy(self, lease): 

45 """Create a new :class:`Tenant`. 

46 

47 Args: 

48 lease (:class:Lease) : Lease of work to do. 

49 """ 

50 lease_id = lease.id 

51 

52 if lease_id not in self._tenants: 

53 tenant = Tenant(lease) 

54 self._tenants[lease_id] = tenant 

55 

56 else: 

57 raise KeyError(f"Lease id already exists: [{lease_id}]") 

58 

59 def remove_tenant(self, lease_id): 

60 """Attempts to remove a tenant. 

61 

62 If the tenant has not been cancelled, it will cancel it. If the tenant has 

63 not completed, it will not remove it. 

64 

65 Args: 

66 lease_id (string) : The lease id. 

67 """ 

68 if not self._tenants[lease_id].lease_cancelled: 

69 self.__logger.error("Attempting to remove a lease not cancelled." 

70 "Bot will attempt to cancel lease." 

71 f"Lease id=[{lease_id}]") 

72 self.cancel_tenancy(lease_id) 

73 

74 elif not self._tenants[lease_id].tenant_completed: 

75 self.__logger.debug("Lease cancelled but tenant not completed." 

76 f"Lease id=[{lease_id}]") 

77 

78 else: 

79 self.__logger.debug(f"Removing tenant=[{lease_id}]") 

80 self._tenants.pop(lease_id) 

81 self._tasks.pop(lease_id) 

82 

83 def get_leases(self): 

84 """Returns a list of leases managed by this instance.""" 

85 leases = [] 

86 for tenant in self._tenants.values(): 

87 leases.append(tenant.lease) 

88 

89 if not leases: 

90 return None 

91 

92 return leases 

93 

94 def get_lease_ids(self): 

95 """Returns a list of lease ids.""" 

96 return self._tenants.keys() 

97 

98 def get_lease_state(self, lease_id): 

99 """Returns the lease state 

100 

101 Args: 

102 lease_id (string) : The lease id. 

103 """ 

104 return self._tenants[lease_id].get_lease_state() 

105 

106 def complete_lease(self, lease_id, status, task=None): 

107 """Informs the :class:`TenantManager` that the lease has completed. 

108 

109 If it was not cancelled, it will update with the result returned from 

110 the task. 

111 

112 Args: 

113 lease_id (string) : The lease id. 

114 status (:class:`Status`) : The final status of the lease. 

115 task (asyncio.Task) : The task of work. 

116 """ 

117 if status is not None: 

118 self._update_lease_status(lease_id, status) 

119 

120 if task and not task.cancelled(): 

121 try: 

122 result = task.result() 

123 

124 except grpc.RpcError as e: 

125 self.__logger.debug(f'Job was unsuccessful, with code {e.code()}') 

126 self._update_lease_status(lease_id, e.code()) 

127 

128 except Exception as e: 

129 self.__logger.debug( 

130 'An exception occurred during execution of the work. ' 

131 f'Setting status to {code_pb2.INTERNAL}') 

132 status = status_pb2.Status() 

133 status.code = code_pb2.INTERNAL 

134 status.message = str(e) 

135 self._update_lease_status(lease_id, status) 

136 

137 else: 

138 self._update_lease_result(lease_id, result.result) 

139 

140 self._update_lease_state(lease_id, LeaseState.COMPLETED) 

141 

142 def create_work(self, lease_id, work, context): 

143 """Creates work to do. 

144 

145 Will place work on an asyncio loop with a callback to `complete_lease`. 

146 

147 Args: 

148 lease_id (string) : The lease id. 

149 work (func) : Work to do. 

150 context (context) : Context for work function. 

151 """ 

152 self._update_lease_state(lease_id, LeaseState.ACTIVE) 

153 tenant = self._tenants[lease_id] 

154 task = asyncio.ensure_future(tenant.run_work(work, context)) 

155 

156 task.add_done_callback(partial(self.complete_lease, lease_id, None)) 

157 self._tasks[lease_id] = task 

158 

159 def cancel_tenancy(self, lease_id): 

160 """Cancels tenancy and any work being done. 

161 

162 Args: 

163 lease_id (string) : The lease id. 

164 """ 

165 if not self._tenants[lease_id].lease_cancelled: 

166 self._tenants[lease_id].cancel_lease() 

167 self._tasks[lease_id].cancel() 

168 

169 def tenant_completed(self, lease_id): 

170 """Returns `True` if the work has been completed. 

171 

172 Args: 

173 lease_id (string) : The lease id. 

174 """ 

175 return self._tenants[lease_id].tenant_completed 

176 

177 async def wait_on_tenants(self, timeout): 

178 if self._tasks: 

179 tasks = self._tasks.values() 

180 await asyncio.wait(tasks, 

181 timeout=timeout, 

182 return_when=asyncio.FIRST_COMPLETED) 

183 

184 def _update_lease_result(self, lease_id, result): 

185 """Updates the lease with the result.""" 

186 self._tenants[lease_id].update_lease_result(result) 

187 

188 def _update_lease_state(self, lease_id, state): 

189 """Updates the lease state.""" 

190 self._tenants[lease_id].update_lease_state(state) 

191 

192 def _update_lease_status(self, lease_id, status): 

193 """Updates the lease status.""" 

194 self._tenants[lease_id].update_lease_status(status)