Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/auth/config.py: 96.67%
30 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2023 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import os
16import re
17from typing import Dict, List, Literal, Optional, Union
19import yaml
20from pydantic import BaseModel, Field
23class Acl(BaseModel):
24 actor: Optional[str] = Field(default=None)
25 requests: Optional[List[str]] = Field(default=None)
26 subject: Optional[str] = Field(default=None)
27 workflow: Optional[str] = Field(default=None)
29 def is_authorized(
30 self,
31 request_name: str,
32 actor: Optional[str] = None,
33 subject: Optional[str] = None,
34 workflow: Optional[str] = None,
35 ) -> bool:
36 if self.actor is not None and not re.match(self.actor, actor or ""):
37 return False
39 if self.subject is not None and not re.match(self.subject, subject or ""):
40 return False
42 if self.workflow is not None and not re.match(self.workflow, workflow or ""):
43 return False
45 if self.requests is not None and request_name not in self.requests:
46 return False
48 return True
51class InstanceAuthorizationConfig(BaseModel):
52 allow: Union[Literal["all"], List[Acl]]
54 def is_authorized(
55 self,
56 request_name: str,
57 actor: Optional[str] = None,
58 subject: Optional[str] = None,
59 workflow: Optional[str] = None,
60 ) -> bool:
61 if self.allow == "all":
62 return True
64 return any(acl.is_authorized(request_name, actor, subject, workflow) for acl in self.allow)
67def parse_auth_config(path: Union[str, bytes, "os.PathLike[str]"]) -> Dict[str, InstanceAuthorizationConfig]:
68 with open(path) as config_file:
69 config = yaml.safe_load(config_file)
70 return {
71 instance_name: InstanceAuthorizationConfig(**instance_config)
72 for instance_name, instance_config in config.items()
73 }