Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for providing Log Analytics Id and Key, and Custom columns in the pod spec #352

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions charts/virtual-kubelet/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ spec:
value: /etc/virtual-kubelet/cert.pem
- name: APISERVER_KEY_LOCATION
value: /etc/virtual-kubelet/key.pem
- name: APISERVER_CA_CERT_LOCATION
value: /etc/kubernetes/certs/ca.crt
- name: VKUBELET_POD_IP
valueFrom:
fieldRef:
Expand Down
8 changes: 5 additions & 3 deletions client/aci/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,18 @@ type ExtensionType string

// Supported extension types
const (
ExtensionTypeKubeProxy ExtensionType = "kube-proxy"
ExtensionTypeRealtimeMetrics ExtensionType = "realtime-metrics"
ExtensionTypeKubeProxy ExtensionType = "kube-proxy"
ExtensionTypeRealtimeMetrics ExtensionType = "realtime-metrics"
ExtensionTypeFirstPartyLogger ExtensionType = "firstpartylogger"
)

// ExtensionVersion is an enum type for defining supported extension versions
type ExtensionVersion string

// Supported extension version
const (
ExtensionVersion1_0 ExtensionVersion = "1.0"
ExtensionVersion1_0 ExtensionVersion = "1.0"
FirstPartyLoggerDefaultVersion ExtensionVersion = ""
)

// Supported kube-proxy extension constants
Expand Down
131 changes: 105 additions & 26 deletions provider/aci.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ const (

virtualKubeletDNSNameLabel = "virtualkubelet.io/dnsnamelabel"

virtualKubeletLogAnalyticsWorkspaceId = "virtualkubelet.io/loganalyticsworkspaceid"
virtualKubeletLogAnalyticsWorkspaceKey = "virtualkubelet.io/loganalyticsworkspacekey"
virtualKubeletACILoggerCustomColumns = "virtualkubelet.io/aciloggercustomcolumns"

subnetDelegationService = "Microsoft.ContainerInstance/containerGroups"
// Parameter names defined in azure file CSI driver, refer to
// https://github.com/kubernetes-sigs/azurefile-csi-driver/blob/master/docs/driver-parameters.md
Expand Down Expand Up @@ -79,32 +83,33 @@ const (

// ACIProvider implements the virtual-kubelet provider interface and communicates with Azure's ACI APIs.
type ACIProvider struct {
aciClient *aci.Client
resourceManager *manager.ResourceManager
resourceGroup string
region string
nodeName string
operatingSystem string
cpu string
memory string
pods string
gpu string
gpuSKUs []aci.GPUSKU
internalIP string
daemonEndpointPort int32
diagnostics *aci.ContainerGroupDiagnostics
subnetName string
subnetCIDR string
vnetSubscriptionID string
vnetName string
vnetResourceGroup string
clusterDomain string
kubeProxyExtension *aci.Extension
realtimeMetricsExtension *aci.Extension
kubeDNSIP string
extraUserAgent string
retryConfig client.HTTPRetryConfig
tracker *PodsTracker
aciClient *aci.Client
resourceManager *manager.ResourceManager
resourceGroup string
region string
nodeName string
operatingSystem string
cpu string
memory string
pods string
gpu string
gpuSKUs []aci.GPUSKU
internalIP string
daemonEndpointPort int32
diagnostics *aci.ContainerGroupDiagnostics
subnetName string
subnetCIDR string
vnetSubscriptionID string
vnetName string
vnetResourceGroup string
clusterDomain string
kubeProxyExtension *aci.Extension
realtimeMetricsExtension *aci.Extension
firstPartyLoggerExtension *aci.Extension
kubeDNSIP string
extraUserAgent string
retryConfig client.HTTPRetryConfig
tracker *PodsTracker

metrics.ACIPodMetricsProvider
}
Expand Down Expand Up @@ -531,6 +536,44 @@ func (p *ACIProvider) setupNetwork(auth *client.Authentication) error {
return nil
}

// Parses custom columns from a pod. Also adds a custom column called podName.
func validateAndGetAciLoggerColumns(ctx context.Context, pod *v1.Pod) string {
var sb strings.Builder
if aciLoggerCustomColumns := pod.Annotations[virtualKubeletACILoggerCustomColumns]; aciLoggerCustomColumns != "" {
var aciLoggerCustomColumnsMap map[string]string
err := json.Unmarshal([]byte(aciLoggerCustomColumns), &aciLoggerCustomColumnsMap)
if err != nil {
log.G(ctx).Infof("Error while unmarshalling annotations: %v with value: %v for pod: %v", virtualKubeletACILoggerCustomColumns, aciLoggerCustomColumns, pod.Name)
}

fmt.Fprintf(&sb, "{\"podName\" : \"%v\"", pod.Name)
for key, element := range aciLoggerCustomColumnsMap {
fmt.Fprintf(&sb, ", \"%v\" : \"%v\"", key, element)
}

fmt.Fprintf(&sb, "}")
}

return sb.String()
}

func getFirstPartyLoggerExtension(ctx context.Context, pod *v1.Pod) (*aci.Extension, error) {
extension := aci.Extension{
Name: "vk-firstparty-logger",
Properties: &aci.ExtensionProperties{
Type: aci.ExtensionTypeFirstPartyLogger,
Version: aci.FirstPartyLoggerDefaultVersion,
Settings: map[string]string{
"containername": ".*",
"payloadtype": "aca",
"custom_metadata": validateAndGetAciLoggerColumns(ctx, pod)},
ProtectedSettings: map[string]string{},
},
}

return &extension, nil
}

func getRealtimeMetricsExtension() (*aci.Extension, error) {
extension := aci.Extension{
Name: "vk-realtime-metrics",
Expand Down Expand Up @@ -745,6 +788,14 @@ func (p *ACIProvider) CreatePod(ctx context.Context, pod *v1.Pod) error {
containerGroup.ContainerGroupProperties.Extensions = append(containerGroup.ContainerGroupProperties.Extensions, p.realtimeMetricsExtension)
}

firstPartyLoggerExtension, err := getFirstPartyLoggerExtension(ctx, pod)
if err != nil {
log.G(context.TODO()).Error("error creating first party logger extension: %v", err)
} else {
p.firstPartyLoggerExtension = firstPartyLoggerExtension
containerGroup.ContainerGroupProperties.Extensions = append(containerGroup.ContainerGroupProperties.Extensions, p.firstPartyLoggerExtension)
}

log.G(ctx).Infof("start creating pod %v", pod.Name)
// TODO: Run in a go routine to not block workers, and use taracker.UpdatePodStatus() based on result.
return p.createContainerGroup(ctx, pod.Namespace, pod.Name, &containerGroup)
Expand Down Expand Up @@ -897,8 +948,36 @@ func formDNSSearchFitsLimits(searches []string) string {

func (p *ACIProvider) getDiagnostics(pod *v1.Pod) *aci.ContainerGroupDiagnostics {
if p.diagnostics != nil && p.diagnostics.LogAnalytics != nil && p.diagnostics.LogAnalytics.LogType == aci.LogAnlyticsLogTypeContainerInsights {
var podDiagnostics *aci.ContainerGroupDiagnostics
var err error

d := *p.diagnostics
d.LogAnalytics.Metadata[aci.LogAnalyticsMetadataKeyPodUUID] = string(pod.ObjectMeta.UID)

// If we have both the log analytics workspace id and key, add them to the provider
if logAnalyticsID := pod.Annotations[virtualKubeletLogAnalyticsWorkspaceId]; logAnalyticsID != "" {
if logAnalyticsKey := pod.Annotations[virtualKubeletLogAnalyticsWorkspaceKey]; logAnalyticsKey != "" {

msg := fmt.Sprintf("Found Log analytics Id and key in the pod spec for pod %s. Setting diagnostics: "+
"LogAnalytics ID: %s, LogAnalytis Key: %s", pod.Name, logAnalyticsID, logAnalyticsKey)
log.G(context.TODO()).Info(msg)

podDiagnostics, err = aci.NewContainerGroupDiagnostics(logAnalyticsID, logAnalyticsKey)
if err != nil {
return &d
}

podDiagnostics.LogAnalytics.LogType = aci.LogAnlyticsLogTypeContainerInsights
podDiagnostics.LogAnalytics.Metadata = map[string]string{
aci.LogAnalyticsMetadataKeyClusterResourceID: os.Getenv("CLUSTER_RESOURCE_ID"),
aci.LogAnalyticsMetadataKeyNodeName: pod.Spec.NodeName,
}
podDiagnostics.LogAnalytics.Metadata[aci.LogAnalyticsMetadataKeyPodUUID] = string(pod.ObjectMeta.UID)

d = *podDiagnostics
}
}

return &d
}
return p.diagnostics
Expand Down
150 changes: 141 additions & 9 deletions provider/aci_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* Copyright (c) Microsoft. All rights reserved.
*/
*/

package provider

Expand Down Expand Up @@ -29,14 +29,17 @@ import (
)

const (
fakeSubscription = "a88d9e8f-3cb3-456f-8f10-27395c1e122a"
fakeResourceGroup = "vk-rg"
fakeClientID = "f14193ad-4c4c-4876-a18a-c0badb3bbd40"
fakeClientSecret = "VGhpcyBpcyBhIHNlY3JldAo="
fakeTenantID = "8cb81aca-83fe-4c6f-b667-4ec09c45a8bf"
fakeNodeName = "vk"
fakeRegion = "eastus"
fakeUserIdentity = "00000000-0000-0000-0000-000000000000"
fakeSubscription = "a88d9e8f-3cb3-456f-8f10-27395c1e122a"
fakeResourceGroup = "vk-rg"
fakeClientID = "f14193ad-4c4c-4876-a18a-c0badb3bbd40"
fakeClientSecret = "VGhpcyBpcyBhIHNlY3JldAo="
fakeTenantID = "8cb81aca-83fe-4c6f-b667-4ec09c45a8bf"
fakeNodeName = "vk"
fakeRegion = "eastus"
fakeUserIdentity = "00000000-0000-0000-0000-000000000000"
fakeCluster = "AKSCluster-a88d9e8f"
fakeLogAnalyticsId = "9a7c6791-e18d-4e62-b10f-aa792493f12b"
fakeLogAnalyticsKey = "SgVkYp3s6v9y$B&E)H@MbQeThWmZq4t7w!z%C*F-JaNdRfUjXn2r5u8x/A?D(G+K"
)

// Test make registry credential
Expand Down Expand Up @@ -833,6 +836,9 @@ func createTestProvider(aadServerMocker *AADMock, aciServerMocker *ACIMock, reso
os.Setenv("AZURE_AUTH_LOCATION", file.Name())
os.Setenv("ACI_RESOURCE_GROUP", fakeResourceGroup)
os.Setenv("ACI_REGION", fakeRegion)
os.Setenv("LOG_ANALYTICS_ID", fakeLogAnalyticsId)
os.Setenv("LOG_ANALYTICS_KEY", fakeLogAnalyticsKey)
os.Setenv("CLUSTER_RESOURCE_ID", fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.ContainerService/managedClusters/%s", fakeSubscription, fakeResourceGroup, fakeCluster))

if resourceManager == nil {
resourceManager, err = manager.NewResourceManager(nil, nil, nil, nil, nil, nil)
Expand Down Expand Up @@ -1064,3 +1070,129 @@ func TestCreatePodWithReadinessProbe(t *testing.T) {
t.Fatal("Failed to create pod", err)
}
}

// Tests create pod with Log Analytics ID and Key provided as annotations
func TestCreatePodWithLogAnalyticsId(t *testing.T) {
aadServerMocker := NewAADMock()
aciServerMocker := NewACIMock()

podName := "pod-" + uuid.New().String()
podNamespace := "ns-" + uuid.New().String()

logAnalyticsId := "9a7c6791-e18d-4e62-b10f-aa792493f12b"
logAnalyticsKey := "SgVkYp3s6v9y$B&E)H@MbQeThWmZq4t7w!z%C*F-JaNdRfUjXn2r5u8x/A?D(G+K"

provider, err := createTestProvider(aadServerMocker, aciServerMocker, nil)
if err != nil {
t.Fatalf("failed to create the test provider. %s", err.Error())
return
}

aciServerMocker.OnCreate = func(subscription, resourceGroup, containerGroup string, cg *aci.ContainerGroup) (int, interface{}) {
assert.Check(t, is.Equal(fakeSubscription, subscription), "Subscription doesn't match")
assert.Check(t, is.Equal(fakeResourceGroup, resourceGroup), "Resource group doesn't match")
assert.Check(t, cg != nil, "Container group is nil")
assert.Check(t, is.Equal(podNamespace+"-"+podName, containerGroup), "Container group name is not expected")
assert.Check(t, cg.ContainerGroupProperties.Containers != nil, "Containers should not be nil")
assert.Check(t, is.Equal(1, len(cg.ContainerGroupProperties.Containers)), "1 Container is expected")
assert.Check(t, is.Equal("nginx", cg.ContainerGroupProperties.Containers[0].Name), "Container nginx is expected")
assert.Check(t, cg.ContainerGroupProperties.Containers[0].Resources.Requests != nil, "Container resource requests should not be nil")
assert.Check(t, is.Equal(cg.ContainerGroupProperties.Diagnostics.LogAnalytics.WorkspaceID, logAnalyticsId), "Workspace Id doesn't match")
assert.Check(t, is.Equal(cg.ContainerGroupProperties.Diagnostics.LogAnalytics.WorkspaceKey, logAnalyticsKey), "Workspace Key doesn't match")

return http.StatusOK, cg
}

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
Annotations: map[string]string{
virtualKubeletLogAnalyticsWorkspaceId: logAnalyticsId,
virtualKubeletLogAnalyticsWorkspaceKey: logAnalyticsKey,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "nginx",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("1.981"),
"memory": resource.MustParse("3.49G"),
},
},
},
},
},
}

if err := provider.CreatePod(context.Background(), pod); err != nil {
t.Fatal("Failed to create pod", err)
}
}

// Tests create pod with custom columns and their values provided as annotations
func TestCreatePodWithCustomColumns(t *testing.T) {
aadServerMocker := NewAADMock()
aciServerMocker := NewACIMock()

podName := "pod-" + uuid.New().String()
podNamespace := "ns-" + uuid.New().String()
revisionId := "test-1"
orchestrationId := "01"

provider, err := createTestProvider(aadServerMocker, aciServerMocker, nil)
if err != nil {
t.Fatalf("failed to create the test provider. %s", err.Error())
return
}

aciServerMocker.OnCreate = func(subscription, resourceGroup, containerGroup string, cg *aci.ContainerGroup) (int, interface{}) {
assert.Check(t, is.Equal(fakeSubscription, subscription), "Subscription doesn't match")
assert.Check(t, is.Equal(fakeResourceGroup, resourceGroup), "Resource group doesn't match")
assert.Check(t, cg != nil, "Container group is nil")
assert.Check(t, is.Equal(podNamespace+"-"+podName, containerGroup), "Container group name is not expected")
assert.Check(t, cg.ContainerGroupProperties.Containers != nil, "Containers should not be nil")
assert.Check(t, is.Equal(1, len(cg.ContainerGroupProperties.Containers)), "1 Container is expected")
assert.Check(t, is.Equal("nginx", cg.ContainerGroupProperties.Containers[0].Name), "Container nginx is expected")
assert.Check(t, cg.ContainerGroupProperties.Containers[0].Resources.Requests != nil, "Container resource requests should not be nil")

var customColumns = cg.ContainerGroupProperties.Extensions[0].Properties.Settings["custom_metadata"]
var customColumnsMap map[string]string
json.Unmarshal([]byte(customColumns), &customColumnsMap)

assert.Check(t, is.Equal(customColumnsMap["podName"], podName), "Custom column podName doesn't match")
assert.Check(t, is.Equal(customColumnsMap["revisionId"], revisionId), "Custom column revisionId doesn't match")
assert.Check(t, is.Equal(customColumnsMap["orchestrationId"], orchestrationId), "Custom column orchestrationId doesn't match")

return http.StatusOK, cg
}

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
Annotations: map[string]string{
virtualKubeletACILoggerCustomColumns: fmt.Sprintf("{\"revisionId\": \"%s\", \"orchestrationId\": \"%s\" }", revisionId, orchestrationId),
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "nginx",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("1.981"),
"memory": resource.MustParse("3.49G"),
},
},
},
},
},
}

if err := provider.CreatePod(context.Background(), pod); err != nil {
t.Fatal("Failed to create pod", err)
}
}