Skip to content

Commit

Permalink
[k8s] Fix incluster auth after multi-context support (#4014)
Browse files Browse the repository at this point in the history
* Make incluster auth work

* lint

* rename

* rename

* pop allowed_contexts from config

* lint

* comments

* comments

* lint
  • Loading branch information
romilbhardwaj authored Sep 29, 2024
1 parent dacf273 commit e6a3b83
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 43 deletions.
5 changes: 5 additions & 0 deletions sky/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]:
secret_field_name = clouds.Kubernetes().ssh_key_secret_field_name
context = config['provider'].get(
'context', kubernetes_utils.get_current_kube_config_context_name())
if context == kubernetes_utils.IN_CLUSTER_REGION:
# If the context is set to IN_CLUSTER_REGION, we are running in a pod
# with in-cluster configuration. We need to set the context to None
# to use the mounted service account.
context = None
namespace = config['provider'].get(
'namespace',
kubernetes_utils.get_kube_config_context_namespace(context))
Expand Down
42 changes: 35 additions & 7 deletions sky/clouds/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,24 @@ def _log_skipped_contexts_once(cls, skipped_contexts: Tuple[str,
'Ignoring these contexts.')

@classmethod
def _existing_allowed_contexts(cls) -> List[str]:
"""Get existing allowed contexts."""
def _existing_allowed_contexts(cls) -> List[Optional[str]]:
"""Get existing allowed contexts.
If None is returned in the list, it means that we are running in a pod
with in-cluster auth. In this case, we specify None context, which will
use the service account mounted in the pod.
"""
all_contexts = kubernetes_utils.get_all_kube_config_context_names()
if all_contexts is None:
if len(all_contexts) == 0:
return []
if all_contexts == [None]:
# If only one context is found and it is None, we are running in a
# pod with in-cluster auth. In this case, we allow it to be used
# without checking against allowed_contexts.
# TODO(romilb): We may want check in-cluster auth against
# allowed_contexts in the future by adding a special context name
# for in-cluster auth.
return [None]
all_contexts = set(all_contexts)

allowed_contexts = skypilot_config.get_nested(
Expand Down Expand Up @@ -164,7 +177,15 @@ def regions_with_offering(cls, instance_type: Optional[str],
del accelerators, zone, use_spot # unused
existing_contexts = cls._existing_allowed_contexts()

regions = [clouds.Region(context) for context in existing_contexts]
regions = []
for context in existing_contexts:
if context is None:
# If running in-cluster, we allow the region to be set to the
# singleton region since there is no context name available.
regions.append(clouds.Region(
kubernetes_utils.IN_CLUSTER_REGION))
else:
regions.append(clouds.Region(context))

if region is not None:
regions = [r for r in regions if r.name == region]
Expand Down Expand Up @@ -541,13 +562,20 @@ def instance_type_exists(self, instance_type: str) -> bool:
def validate_region_zone(self, region: Optional[str], zone: Optional[str]):
if region == self._LEGACY_SINGLETON_REGION:
# For backward compatibility, we allow the region to be set to the
# legacy singletonton region.
# legacy singleton region.
# TODO: Remove this after 0.9.0.
return region, zone

if region == kubernetes_utils.IN_CLUSTER_REGION:
# If running incluster, we set region to IN_CLUSTER_REGION
# since there is no context name available.
return region, zone

all_contexts = kubernetes_utils.get_all_kube_config_context_names()
if all_contexts is None:
all_contexts = []
if all_contexts == [None]:
# If [None] context is returned, use the singleton region since we
# are running in a pod with in-cluster auth.
all_contexts = [kubernetes_utils.IN_CLUSTER_REGION]
if region not in all_contexts:
raise ValueError(
f'Context {region} not found in kubeconfig. Kubernetes only '
Expand Down
9 changes: 5 additions & 4 deletions sky/provision/kubernetes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ def _get_resource(container_resources: Dict[str, Any], resource_name: str,


def _configure_autoscaler_service_account(
namespace: str, context: str, provider_config: Dict[str, Any]) -> None:
namespace: str, context: Optional[str],
provider_config: Dict[str, Any]) -> None:
account_field = 'autoscaler_service_account'
if account_field not in provider_config:
logger.info('_configure_autoscaler_service_account: '
Expand Down Expand Up @@ -281,7 +282,7 @@ def _configure_autoscaler_service_account(
f'{created_msg(account_field, name)}')


def _configure_autoscaler_role(namespace: str, context: str,
def _configure_autoscaler_role(namespace: str, context: Optional[str],
provider_config: Dict[str, Any],
role_field: str) -> None:
""" Reads the role from the provider config, creates if it does not exist.
Expand Down Expand Up @@ -330,7 +331,7 @@ def _configure_autoscaler_role(namespace: str, context: str,

def _configure_autoscaler_role_binding(
namespace: str,
context: str,
context: Optional[str],
provider_config: Dict[str, Any],
binding_field: str,
override_name: Optional[str] = None,
Expand Down Expand Up @@ -620,7 +621,7 @@ def _configure_fuse_mounting(provider_config: Dict[str, Any]) -> None:
f'in namespace {fuse_device_manager_namespace!r}')


def _configure_services(namespace: str, context: str,
def _configure_services(namespace: str, context: Optional[str],
provider_config: Dict[str, Any]) -> None:
service_field = 'services'
if service_field not in provider_config:
Expand Down
13 changes: 8 additions & 5 deletions sky/provision/kubernetes/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ def _check_init_containers(pod):
time.sleep(1)


def _set_env_vars_in_pods(namespace: str, context: str, new_pods: List):
def _set_env_vars_in_pods(namespace: str, context: Optional[str],
new_pods: List):
"""Setting environment variables in pods.
Once all containers are ready, we can exec into them and set env vars.
Expand Down Expand Up @@ -330,7 +331,7 @@ def _set_env_vars_in_pods(namespace: str, context: str, new_pods: List):
new_pod.metadata.name, rc, stdout)


def _check_user_privilege(namespace: str, context: str,
def _check_user_privilege(namespace: str, context: Optional[str],
new_nodes: List) -> None:
# Checks if the default user has sufficient privilege to set up
# the kubernetes instance pod.
Expand Down Expand Up @@ -366,7 +367,8 @@ def _check_user_privilege(namespace: str, context: str,
'from the image.')


def _setup_ssh_in_pods(namespace: str, context: str, new_nodes: List) -> None:
def _setup_ssh_in_pods(namespace: str, context: Optional[str],
new_nodes: List) -> None:
# Setting up ssh for the pod instance. This is already setup for
# the jump pod so it does not need to be run for it.
set_k8s_ssh_cmd = (
Expand Down Expand Up @@ -410,7 +412,7 @@ def _setup_ssh_in_pods(namespace: str, context: str, new_nodes: List) -> None:
logger.info(f'{"-"*20}End: Set up SSH in pod {pod_name!r} {"-"*20}')


def _label_pod(namespace: str, context: str, pod_name: str,
def _label_pod(namespace: str, context: Optional[str], pod_name: str,
label: Dict[str, str]) -> None:
"""Label a pod."""
kubernetes.core_api(context).patch_namespaced_pod(
Expand Down Expand Up @@ -647,7 +649,8 @@ def stop_instances(
raise NotImplementedError()


def _terminate_node(namespace: str, context: str, pod_name: str) -> None:
def _terminate_node(namespace: str, context: Optional[str],
pod_name: str) -> None:
"""Terminate a pod."""
logger.debug('terminate_instances: calling delete_namespaced_pod')
try:
Expand Down
15 changes: 8 additions & 7 deletions sky/provision/kubernetes/network_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def fill_ingress_template(namespace: str, service_details: List[Tuple[str, int,


def create_or_replace_namespaced_ingress(
namespace: str, context: str, ingress_name: str,
namespace: str, context: Optional[str], ingress_name: str,
ingress_spec: Dict[str, Union[str, int]]) -> None:
"""Creates an ingress resource for the specified service."""
networking_api = kubernetes.networking_api(context)
Expand All @@ -156,7 +156,7 @@ def create_or_replace_namespaced_ingress(
_request_timeout=kubernetes.API_TIMEOUT)


def delete_namespaced_ingress(namespace: str, context: str,
def delete_namespaced_ingress(namespace: str, context: Optional[str],
ingress_name: str) -> None:
"""Deletes an ingress resource."""
networking_api = kubernetes.networking_api(context)
Expand All @@ -171,7 +171,7 @@ def delete_namespaced_ingress(namespace: str, context: str,


def create_or_replace_namespaced_service(
namespace: str, context: str, service_name: str,
namespace: str, context: Optional[str], service_name: str,
service_spec: Dict[str, Union[str, int]]) -> None:
"""Creates a service resource for the specified service."""
core_api = kubernetes.core_api(context)
Expand Down Expand Up @@ -208,7 +208,7 @@ def delete_namespaced_service(namespace: str, service_name: str) -> None:
raise e


def ingress_controller_exists(context: str,
def ingress_controller_exists(context: Optional[str],
ingress_class_name: str = 'nginx') -> bool:
"""Checks if an ingress controller exists in the cluster."""
networking_api = kubernetes.networking_api(context)
Expand All @@ -220,7 +220,7 @@ def ingress_controller_exists(context: str,


def get_ingress_external_ip_and_ports(
context: str,
context: Optional[str],
namespace: str = 'ingress-nginx'
) -> Tuple[Optional[str], Optional[Tuple[int, int]]]:
"""Returns external ip and ports for the ingress controller."""
Expand Down Expand Up @@ -258,7 +258,7 @@ def get_ingress_external_ip_and_ports(
return external_ip, None


def get_loadbalancer_ip(context: str,
def get_loadbalancer_ip(context: Optional[str],
namespace: str,
service_name: str,
timeout: int = 0) -> Optional[str]:
Expand All @@ -284,7 +284,8 @@ def get_loadbalancer_ip(context: str,
return ip


def get_pod_ip(context: str, namespace: str, pod_name: str) -> Optional[str]:
def get_pod_ip(context: Optional[str], namespace: str,
pod_name: str) -> Optional[str]:
"""Returns the IP address of the pod."""
core_api = kubernetes.core_api(context)
pod = core_api.read_namespaced_pod(pod_name,
Expand Down
Loading

0 comments on commit e6a3b83

Please sign in to comment.