Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/execution/instance.py: 95.51%
89 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17ExecutionInstance
18=================
19An instance of the Remote Execution Service.
20"""
23from contextlib import ExitStack
24from typing import Iterable, Optional
26from buildgrid_metering.models.dataclasses import Identity, RPCUsage, Usage
28from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import DESCRIPTOR as RE_DESCRIPTOR
29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import (
30 Action,
31 Command,
32 Digest,
33 Platform,
34 RequestMetadata,
35)
36from buildgrid._protos.google.longrunning.operations_pb2 import Operation
37from buildgrid.server.auth.manager import get_context_client_identity
38from buildgrid.server.enums import MeteringThrottleAction
39from buildgrid.server.exceptions import FailedPreconditionError, InvalidArgumentError, ResourceExhaustedError
40from buildgrid.server.logging import buildgrid_logger
41from buildgrid.server.scheduler import Scheduler
42from buildgrid.server.servicer import Instance
43from buildgrid.server.sql.models import ClientIdentityEntry
44from buildgrid.server.utils.cancellation import CancellationContext
46# All priorities >= this value will not be throttled / deprioritized
47EXECUTION_DEPRIORITIZATION_LIMIT = 1
49LOGGER = buildgrid_logger(__name__)
52class ExecutionInstance(Instance):
53 SERVICE_NAME = RE_DESCRIPTOR.services_by_name["Execution"].full_name
55 def __init__(
56 self,
57 scheduler: Scheduler,
58 operation_stream_keepalive_timeout: Optional[int] = None,
59 ) -> None:
60 self._stack = ExitStack()
61 self.scheduler = scheduler
63 self._operation_stream_keepalive_timeout = operation_stream_keepalive_timeout
65 def start(self) -> None:
66 self.scheduler.start(start_job_watcher=True)
67 self._stack.callback(self.scheduler.stop)
69 def stop(self) -> None:
70 self._stack.close()
71 LOGGER.info("Stopped Execution.")
73 def execute(
74 self,
75 *,
76 action_digest: Digest,
77 skip_cache_lookup: bool,
78 priority: int = 0,
79 request_metadata: Optional[RequestMetadata] = None,
80 client_identity: Optional[ClientIdentityEntry] = None,
81 ) -> str:
82 """
83 Sends a job for execution. Queues an action and creates an Operation to be associated with this action.
84 """
86 action = self.scheduler.storage.get_message(action_digest, Action)
87 if not action:
88 raise FailedPreconditionError("Could not get action from storage.")
90 command = self.scheduler.storage.get_message(action.command_digest, Command)
91 if not command:
92 raise FailedPreconditionError("Could not get command from storage.")
94 if action.HasField("platform"):
95 platform = action.platform
96 elif command.HasField("platform"):
97 platform = command.platform
98 else:
99 platform = Platform()
101 property_label, platform_requirements = self.scheduler.property_set.execution_properties(platform)
103 should_throttle = self._should_throttle_execution(priority, client_identity)
104 if should_throttle:
105 if self.scheduler.metering_throttle_action == MeteringThrottleAction.REJECT:
106 raise ResourceExhaustedError("User quota exceeded")
108 # TODO test_execution_instance is a total mess. It mocks way too much making tests
109 # brittle. when possible merge it into execution_service tests and use proper logging here.
110 # Should be able to write `action_digest=[{action_digest.hash}/{action_digest.size_bytes}]`, but cant
111 # AttributeError: 'str' object has no attribute 'hash'
112 LOGGER.info(
113 "Job priority throttled.",
114 tags=dict(digest=action_digest, old_priority=priority, new_priority=EXECUTION_DEPRIORITIZATION_LIMIT),
115 )
116 priority = EXECUTION_DEPRIORITIZATION_LIMIT
118 operation_name = self.scheduler.queue_job_action(
119 action=action,
120 action_digest=action_digest,
121 command=command,
122 platform_requirements=platform_requirements,
123 property_label=property_label,
124 skip_cache_lookup=skip_cache_lookup,
125 priority=priority,
126 request_metadata=request_metadata,
127 client_identity=client_identity,
128 )
129 self._meter_execution(client_identity, operation_name)
130 return operation_name
132 def stream_operation_updates(self, operation_name: str, context: CancellationContext) -> Iterable[Operation]:
133 job_name = self.scheduler.get_operation_job_name(operation_name)
134 if not job_name:
135 raise InvalidArgumentError(f"Operation name does not exist: [{operation_name}]")
136 # Start the listener as soon as we get the job name and re-query. This avoids potentially missing
137 # the completed update if it triggers in between sending back the first result and the yield resuming.
138 with self.scheduler.ops_notifier.subscription(job_name) as update_requested:
139 yield (operation := self.scheduler.load_operation(operation_name))
140 if operation.done:
141 return
143 # When the context is deactivated, we can quickly stop waiting.
144 context.on_cancel(update_requested.set)
145 while not context.is_cancelled():
146 update_requested.wait(timeout=self._operation_stream_keepalive_timeout)
147 update_requested.clear()
149 if context.is_cancelled():
150 return
152 yield (operation := self.scheduler.load_operation(operation_name))
153 if operation.done:
154 return
156 def _meter_execution(self, client_identity: Optional[ClientIdentityEntry], operation_name: str) -> None:
157 """Meter the number of executions of client"""
158 if self.scheduler.metering_client is None or client_identity is None:
159 return
160 try:
161 identity = Identity(
162 instance=client_identity.instance,
163 workflow=client_identity.workflow,
164 actor=client_identity.actor,
165 subject=client_identity.subject,
166 )
167 usage = Usage(rpc=RPCUsage(execute=1))
168 self.scheduler.metering_client.put_usage(identity, operation_name, usage)
169 except Exception as exc:
170 LOGGER.exception(
171 f"Failed to publish execution usage for identity: {get_context_client_identity()}", exc_info=exc
172 )
174 def _should_throttle_execution(self, priority: int, client_identity: Optional[ClientIdentityEntry]) -> bool:
175 if (
176 priority >= EXECUTION_DEPRIORITIZATION_LIMIT
177 or self.scheduler.metering_client is None
178 or client_identity is None
179 ):
180 return False
181 try:
182 identity = Identity(
183 instance=client_identity.instance,
184 workflow=client_identity.workflow,
185 actor=client_identity.actor,
186 subject=client_identity.subject,
187 )
188 response = self.scheduler.metering_client.get_throttling(identity)
189 if response.throttled:
190 LOGGER.info(
191 "Execution request is throttled.",
192 tags=dict(client_id=client_identity, usage=response.tracked_usage),
193 )
194 return response.throttled
195 except Exception as exc:
196 LOGGER.exception("Failed to get throttling information.", exc_info=exc)
197 return False