AWS OSS製の高速Cluster Autoscaler Karpenter

今回の記事はリクルートアドベントカレンダー2021の10日目の記事です。

こんにちは。スタディサプリ ENGLISH SREグループの木村です。

re:Invent2021で AWS OSS製のCluster Autoscaler KarpenterがProduction readyになったことをがアナウンスされました。
『スタディサプリENGLISH』では基盤にkubernetesを採用しており、今回導入ができないか検証をした記録です。
1)現在はauto scalingにはspotを利用しており別途記事になっているので興味があればこちらも参照ください

Karpenterとは?

公式の説明では下記のように説明されています。

Karpenter automatically launches just the right compute resources to handle your cluster's applications. It is designed to let you take full advantage of the cloud with fast and simple compute provisioning for Kubernetes clusters.

KubernetesではPodという単位でアプリケーションをデプロイすることができますが、実際にPodを起動するためにはは土台となるコンピューティングリソース2)AWSでの実際のリソースとしてはEC2,kubernetesの単位としてはNodeが必要です。
KarpenterはKubernetesに反映したPodの数に合わせて、Nodeの数を自動で調整してくれるソフトウェアです。
Nodeの数は固定で運用することも出来ますが、クラウドではリソースは従量課金なので必要な時に必要な数のNodeだけを確保することでお金の節約が出来ます。

Kubernetesには公式でCluster Autoscaler3)https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/aws/README.mdclusterのsizeを調整してくれる機能がありますが、
こちらはmanaged node groupとauto scaling group経由でclusterのsizeを調整しています。
しかし、Karpenterはauto scaling groupに依存せずにclusterのsizeを調整することで更なる高速化をしているそうです。4)既存のCluster AutoscalerとKarpenterの違いはこの動画でも紹介されていますhttps://twitter.com/toricls/status/1465782626212483074?s=20

動かしてみる

Karpenterはhelm chartが用意されており、既存のclusterにhelm installをすることで導入が可能です。
『スタディサプリENGLISH』ではTerraformで環境構築、manifestをArgoCDを利用してGitOpsを使って管理しているのでArgoCDを使った方法で構築していきます。
2021/12月執筆時点のv0.5.1 で構築しています。
5)環境に関して詳しく知りたい方は以前に書いたAmazon EKSでのArgoCDを使ったGitOps CDに詳しく書いてあるので是非読んでみてください。

Terraform

公式Docはeksctlで作成していますが、今回はterraformで作成してみたいと思います。

locals {
  cluster_name = "my-eks-cluster"
}
module "vpc" {
  source                 = "terraform-aws-modules/vpc/aws"
  name                   = local.cluster_name
  cidr                   = "10.0.0.0/16"
  azs                    = ["ap-northeast-1a", "ap-northeast-1c"]
  private_subnets        = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
  public_subnets         = ["10.0.101.0/24", "10.0.102.0/24", "10.0.103.0/24"]
  enable_nat_gateway     = true
  single_nat_gateway     = true
  one_nat_gateway_per_az = false
  private_subnet_tags = {
    ## Auto Discoveryのために名前をつける
    "kubernetes.io/cluster/${local.cluster_name}" = "owned"
  }
}
data "aws_eks_cluster" "eks" {
  name = module.eks.cluster_id
}
data "aws_eks_cluster_auth" "eks" {
  name = module.eks.cluster_id
}
provider "kubernetes" {
  host                   = data.aws_eks_cluster.eks.endpoint
  cluster_ca_certificate = base64decode(data.aws_eks_cluster.eks.certificate_authority.0.data)
  token                  = data.aws_eks_cluster_auth.eks.token
}
module "eks" {
  source           = "terraform-aws-modules/eks/aws"
  cluster_version  = "1.21"
  write_kubeconfig = false
  cluster_name     = local.cluster_name
  vpc_id           = module.vpc.vpc_id
  subnets          = module.vpc.private_subnets
  # IRSAを有効化してservice accountベースで動かす
  enable_irsa = true
  # 初回起動時にargocdとKarpenterを動かすためのNode
  worker_groups = [
    {
      instance_type       = "m5.large"
      asg_max_size        = 3
      additional_userdata = <<EOF
yum install -y https://s3.amazonaws.com/ec2-downloads-windows/SSMAgent/latest/linux_amd64/amazon-ssm-agent.rpm
systemctl status amazon-ssm-agent
systemctl enable amazon-ssm-agent
systemctl start amazon-ssm-agent
EOF
    }
  ]
  workers_additional_policies = [
    "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore",
  ]
}
# Karpenterで指定するinstance profileを指定している
resource "aws_iam_instance_profile" "Karpenter" {
  name = "KarpenterNodeInstanceProfile-${module.eks.cluster_id}"
  role = module.eks.worker_iam_role_name
}
# IRSAで使うservice accountに設定するiam roleの設定
module "iam_assumable_role_Karpenter" {
  source                        = "terraform-aws-modules/iam/aws//modules/iam-assumable-role-with-oidc"
  version                       = "4.7.0"
  create_role                   = true
  role_name                     = "Karpenter-controller-${local.cluster_name}"
  provider_url                  = module.eks.cluster_oidc_issuer_url
  oidc_fully_qualified_subjects = ["system:serviceaccount:Karpenter:Karpenter"]
}
# Karpernterの実行に必要なpolicy
resource "aws_iam_role_policy" "Karpenter_contoller" {
  name = "Karpenter-policy-${local.cluster_name}"
  role = module.iam_assumable_role_Karpenter.iam_role_name
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = [
          "ec2:CreateLaunchTemplate",
          "ec2:CreateFleet",
          "ec2:RunInstances",
          "ec2:CreateTags",
          "iam:PassRole",
          "ec2:TerminateInstances",
          "ec2:DescribeLaunchTemplates",
          "ec2:DescribeInstances",
          "ec2:DescribeSecurityGroups",
          "ec2:DescribeSubnets",
          "ec2:DescribeInstanceTypes",
          "ec2:DescribeInstanceTypeOfferings",
          "ec2:DescribeAvailabilityZones",
          "ssm:GetParameter"
        ]
        Effect   = "Allow"
        Resource = "*"
      },
    ]
  })
}

Chart

helm chartのinstallは公式repoに書いてあり、先ほど作ったterraformのresourceの値を埋めていきます。

今回の場合はGitOpsで管理していくので、公式のコマンドをArgoCDのApplicationの形に合わせて書いていきます。

apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  finalizers:
    - resources-finalizer.argocd.argoproj.io
  name: Karpenter
  namespace: argocd
spec:
  destination:
    namespace: Karpenter
    server: https://kubernetes.default.svc
  project: defeault
  source:
    chart: Karpenter
    helm:
      parameters:
        - name: serviceAccount.annotations.eks\.amazonaws\.com/role-arn
          value: "irsaに使うiam roneのarn"
        - name: controller.clusterName
          value: "ここに作成したcluster nameを入れる"
        - name: controller.clusterEndpoint
          value: "作成したclusterのcluster endpoint"
      releaseName: Karpenter
    repoURL: https://charts.Karpenter.sh
    targetRevision: 0.5.1
  syncPolicy:
    automated:
      prune: false

注意点としてはhelmのparameterにわたすときにannotations以降の.をchartのyamlのobjectのkeyとして認識されてしまうのでescapeをしてあります。
また、karpeter自体に与えられるparameterはclusterNameやclusterEndpointなど、AWSではなくkubernetes自体に依存するものになっていて、今後AWS以外のCloud Providerの実装ができるようになっていることを感じられます。

CRD

KarpenterはNodeのProvisionをする単位をProvisionerというCRDを使って管理しています。

今回は開発環境でspot instanceを使ってauto scalingをしたいと思っているので下記のようなyamlを用意して反映しました。

apiVersion: Karpenter.sh/v1alpha5
kind: Provisioner
metadata:
  name: default
spec:
  requirements:
    - key: "node.kubernetes.io/instance-type" 
      operator: In
      values: ["m5.large", "m5.2xlarge"]
    - key: "topology.kubernetes.io/zone" 
      operator: In
      values: ["ap-northeast-1a", "ap-northeast-1c"]
    - key: "kubernetes.io/arch" 
      operator: In
      values: ["amd64"]
    - key: "Karpenter.sh/capacity-type"
      operator: In
      values: ["spot"]
  ttlSecondsAfterEmpty: 30
  provider:
    instanceProfile: KarpenterNodeInstanceProfile-my-eks-cluster

項目の説明

簡単にですが項目を説明していきます。

spec

  • specの箇所ではclusterに参加させるNodeの種類などを指定することができます。
  • requirements
    requirementsはアサインされるNodeの種類を設定することができます。
    node.kubernetes.io/instance-typeはインスタンスタイプ、topology.kubernetes.io/zoneはavalvility zone、archはcpuの種類(デフォルトはamd64)
    Karpenter.sh/capacity-typeはinstanceのtype(デフォルトはspot)
  • https://Karpenter.sh/docs/aws/constraints/

Provider

  • Providerの箇所ではCloud Provider6)現状ではAWSのみ対応にわたすパラメーターを記述します。
  • instanceProfile は必須項目EC2で扱うinstance profileを渡しています。
  • EC2の疎通に必要なsecurity groupやsubnetはKarpenterが kubernetes.io/cluster/{clusterの名前} でtagをつけているとAuto Discoveryをしてくれます。
  • 今回はinstanceProfileの設定のみを利用しましたが、動かすsubnetの指定を行うことができる SubnetSelector や Nodeに別途のsecutiry groupを付与することができる SecurityGroupSelector などの設定があるのでぜひ公式docで別の設定を読んでみてください。

後は実際にdeploymentなどでpodを立ち上げると、Karpenterが自動でNodeをauto scalingをしてくれます。
簡単ですね!

KarpenterのProvisioning処理を見ていく

導入も簡単で非常に早いscalingが行えるKarpenterですが、何故こんなに早いのでしょうか?
前述の通りKarpenterはAWSのCluster Autoscalerと違いAuto scaling groupに依存しないことによって高速化をしています。
興味を持ったので、NodeのprovisioningとPodのschedulingの処理を少し細かく見てみました。

Provisioning

  • 実際にPodのSchedulingとNodeの立ち上げをしている箇所を見ていきます。
func (p *Provisioner) provision(ctx context.Context) (err error) {
    // Wait for a batch of pods
    pods := p.Batch(ctx)
    // Communicate the result of the provisioning loop to each of the pods.
    defer func() {
        for i := 0; i < len(pods); i++ {
            select {
            case p.results <- err: // Block until result is communicated
            case <-p.done: // Leave if closed
            }
        }
    }()
    // Separate pods by scheduling constraints
    schedules, err := p.scheduler.Solve(ctx, p.Provisioner, pods)
    if err != nil {
        return fmt.Errorf("solving scheduling constraints, %w", err)
    }
    // Launch capacity and bind pods
    for _, schedule := range schedules {
        packings, err := p.packer.Pack(ctx, schedule.Constraints, schedule.Pods)
        if err != nil {
            return fmt.Errorf("binpacking pods, %w", err)
        }
        for _, packing := range packings {
            if err := p.launch(ctx, schedule.Constraints, packing); err != nil {
                logging.FromContext(ctx).Error("Could not launch node, %s", err.Error())
                continue
            }
        }
    }
    return nil
}
func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int) ([]*string, error) {
    // Default to on-demand unless constrained otherwise or if flexible to spot and
    // on-demand. This code assumes two options: {spot, on-demand}, which is enforced
    // by constraints.Constrain(). Spot may be selected by constraining the provisioner,
    // or using nodeSelectors, required node affinity, or preferred node affinity.
    capacityType := v1alpha1.CapacityTypeOnDemand
    if capacityTypes := constraints.Requirements.CapacityTypes(); len(capacityTypes) == 0 {
        return nil, fmt.Errorf("invariant violated, must contain at least one capacity type")
    } else if len(capacityTypes) == 1 {
        capacityType = capacityTypes.UnsortedList()[0]
    }
    // Get Launch Template Configs, which may differ due to GPU or Architecture requirements
    launchTemplateConfigs, err := p.getLaunchTemplateConfigs(ctx, constraints, instanceTypes, capacityType)
    if err != nil {
        return nil, fmt.Errorf("getting launch template configs, %w", err)
    }
    // Create fleet
    createFleetOutput, err := p.ec2api.CreateFleetWithContext(ctx, &ec2.CreateFleetInput{
        Type:                  aws.String(ec2.FleetTypeInstant),
        LaunchTemplateConfigs: launchTemplateConfigs,
        TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
            DefaultTargetCapacityType: aws.String(capacityType),
            TotalTargetCapacity:       aws.Int64(int64(quantity)),
        },
        TagSpecifications: []*ec2.TagSpecification{
            {
                ResourceType: aws.String(ec2.ResourceTypeInstance),
                Tags:         v1alpha1.MergeTags(v1alpha1.ManagedTagsFor(injection.GetOptions(ctx).ClusterName), constraints.Tags),
            },
        },
        // OnDemandOptions are allowed to be specified even when requesting spot
        OnDemandOptions: &ec2.OnDemandOptionsRequest{AllocationStrategy: aws.String(ec2.FleetOnDemandAllocationStrategyLowestPrice)},
        // SpotOptions are allowed to be specified even when requesting on-demand
        SpotOptions: &ec2.SpotOptionsRequest{AllocationStrategy: aws.String(ec2.SpotAllocationStrategyCapacityOptimizedPrioritized)},
    })
    if err != nil {
        return nil, fmt.Errorf("creating fleet %w", err)
    }
    instanceIds := combineFleetInstances(*createFleetOutput)
    if len(instanceIds) == 0 {
        return nil, combineFleetErrors(createFleetOutput.Errors)
    } else if len(instanceIds) != quantity {
        logging.FromContext(ctx).Errorf("Failed to launch %d EC2 instances out of the %d EC2 instances requested: %s",
            quantity-len(instanceIds), quantity, combineFleetErrors(createFleetOutput.Errors).Error())
    }
    return instanceIds, nil
}
func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constraints, packing *binpacking.Packing) error {
    if err := p.verifyResourceLimits(ctx, p.Provisioner); err != nil {
        return fmt.Errorf("limits exceeded, %w", err)
    }
    packedPods := queueFor(packing.Pods)
    return <-p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error {
        node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels)
        node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...)
        return p.bind(ctx, node, <-packedPods)
    })
}
func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) {
    defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))()
    // Add the Karpenter finalizer to the node to enable the termination workflow
    node.Finalizers = append(node.Finalizers, v1alpha5.TerminationFinalizer)
    // Taint Karpenter.sh/not-ready=NoSchedule to prevent the kube scheduler
    // from scheduling pods before we're able to bind them ourselves. The kube
    // scheduler has an eventually consistent cache of nodes and pods, so it's
    // possible for it to see a provisioned node before it sees the pods bound
    // to it. This creates an edge case where other pending pods may be bound to
    // the node by the kube scheduler, causing OutOfCPU errors when the
    // binpacked pods race to bind to t
    he same node. The system eventually
    // heals, but causes delays from additional provisioning (thrash). This
    // taint will be removed by the node controller when a node is marked ready.
    node.Spec.Taints = append(node.Spec.Taints, v1.Taint{
        Key:    v1alpha5.NotReadyTaintKey,
        Effect: v1.TaintEffectNoSchedule,
    })
    // Idempotently create a node. In rare cases, nodes can come online and
    // self register before the controller is able to register a node object
    // with the API server. In the common case, we create the node object
    // ourselves to enforce the binding decision and enable images to be pulled
    // before the node is fully Ready.
    if _, err := p.coreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
        if !errors.IsAlreadyExists(err) {
            return fmt.Errorf("creating node %s, %w", node.Name, err)
        }
    }
    // Bind pods
    var bound int64
    workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(i int) {
        pod := pods[i]
        binding := &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}
        if err := p.coreV1Client.Pods(pods[i].Namespace).Bind(ctx, binding, metav1.CreateOptions{}); err != nil {
            logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pod.Namespace, pod.Name, node.Name, err.Error())
        } else {
            atomic.AddInt64(&bound, 1)
        }
    })
    logging.FromContext(ctx).Infof("Bound %d pod(s) to node %s", bound, node.Name)
    return nil
}

まとめ

AWS OSS製のCluster Autoscalerを今回は試してみました。
scalingの高速で、helmでinstallするだけで使える簡単さ、またterraformとkubernetesのmanifestで分断しがちなsecurity groupやsubnetなどのを設定をAuto Discoveryできるようになっているなど、非常に運用などでも扱いやすい洗練されたものを感じました。
今回はあまりのscaleの速さに内部コードを読んでみたところ、スマートながらも力強い手段で実装されていて驚きましたが、
kubernetesの素晴らしさはコアな部分にもサードパーティのツールから疎結合なままカスタマイズ性を再確認したので、よりkubernetesを活用、よい使いやすい基盤にしていきたいという気持ちが高まりました。

脚注

脚注
1 現在はauto scalingにはspotを利用しており別途記事になっているので興味があればこちらも参照ください
2 AWSでの実際のリソースとしてはEC2,kubernetesの単位としてはNode
3 https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/aws/README.md
4 既存のCluster AutoscalerとKarpenterの違いはこの動画でも紹介されていますhttps://twitter.com/toricls/status/1465782626212483074?s=20
5 環境に関して詳しく知りたい方は以前に書いたAmazon EKSでのArgoCDを使ったGitOps CDに詳しく書いてあるので是非読んでみてください。
6 現状ではAWSのみ対応