なになれ

IT系のことを記録していきます

Kubernetesのコアコンセプト「Reconciliation Loop」をソースコードリーディングでより深く理解する

Infra Study Meetup #2において、Kubernetesの紹介を中心とした基調講演が行われました。
Kubernetesに興味を持っている自分にとっては、Kubernetesの未来を感じさせてくれる講演でした。
youtu.be

この講演の中でKubernetesに未来を感じる部分として、ControllerにおけるReconciliation Loopが紹介されました。
Reconciliation LoopはKubernetes界隈では良く紹介される単語で、Kubernetes利用者にとって理解すべきものになっているかなと思います。

今回は、このReconciliation Loopとは何か、Kubernetesソースコードを読んで、どのように実現されているか、より深くReconciliation Loopを理解したいと思います。

Kubernetesを構成するコンポーネントについて

はじめに、Kubernetesを構成するコンポーネントについて簡単に紹介します。
f:id:hi1280:20200606155504p:plain

Kubernetesのリソースに対してのインタフェースをAPI Serverが担当しています。
etcdはKubernetesクラスタ情報を保持するデータストアです。
SchedulerはPodをどのノードに配置するかを文字通りスケジュールするためのコンポーネントです。
Schedulerの命令を受けて、実際にはKubeletがコンテナを起動します。

Controller Managerが今回の話に関係するコンポーネントです。
Controller Managerには、複数のControllerが存在しています。
Controllerは、それぞれのControllerの責務に応じて、常にKubernetesのリソースが希望通りになるように動き続けています。
例えば、Pod数を希望通りにするといった責務です。

Reconciliation Loopとは

責務を忠実に実行するためにControllerはループ処理の中でKubernetesのリソースが希望通りになるように動き続けています。
これがReconciliation Loopです。
このループ処理の中で、現在の状態を確認する、希望と現在を比較する、差異があれば希望通りになるように処理するといった動きをしています。
これが本当にそのようになっているかKubernetesソースコードを読んで確認してみます。

ソースコードリーディングによるReconciliation Loopの理解

いくつかControllerがありますが、今回はReplicaSet Controllerのソースコードを読みます。

ReplicaSet Controllerを起動する

ReplicaSet Controllerがどのように実行されるかを見てみます。
前述のコンポーネントの構成によれば、Controller Managerから実行されているはずです。

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
    ...
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    return nil, true, nil
}

https://github.com/kubernetes/kubernetes/blob/v1.18.3/cmd/kube-controller-manager/app/apps.go#L73

kube-controller-managerというコンポーネントの中で、NewReplicaSetController関数でReplicaSet Controllerを初期化しています。
初期化後にRun関数でReplicaSet Controllerを実行しています。
kube-controller-managerはController Managerに含まれるコンポーネントです。
そのほかに、kube-controller-managerの中で、Deployment ControllerやStatefulSet Controllerなど、Kubernetesで良く利用されるリソースのControllerが実行されています。

現在の状態を確認する

NewReplicaSetController関数の中身になります。
ReplicaSet Controllerが現在の状態を確認する処理が含まれています。
具体的にはReplicaSetやPodといったKubernetesのリソース情報を取得する部分になります。

func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
...
    return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
        apps.SchemeGroupVersion.WithKind("ReplicaSet"),
        "replicaset_controller",
        "replicaset",
        controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
        },
    )
}
...
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
...
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    rsc.addRS,
        UpdateFunc: rsc.updateRS,
        DeleteFunc: rsc.deleteRS,
    })
...
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
        // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
        // local storage, so it should be ok.
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })
...
}

https://github.com/kubernetes/kubernetes/blob/v1.18.3/pkg/controller/replicaset/replica_set.go#L143

Informerというモジュールを介して、ReplicaSetのリソースが追加、更新、削除された時にイベントドリブンで処理しています。
Podもイベントドリブンで同じように処理しています。

...
func (rsc *ReplicaSetController) addRS(obj interface{}) {
    rs := obj.(*apps.ReplicaSet)
    klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)
    rsc.enqueueRS(rs)
}
...
func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {
    key, err := controller.KeyFunc(rs)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
        return
    }

    rsc.queue.Add(key)
}
...

https://github.com/kubernetes/kubernetes/blob/v1.18.3/pkg/controller/replicaset/replica_set.go#L287

https://github.com/kubernetes/kubernetes/blob/v1.18.3/pkg/controller/replicaset/replica_set.go#L267

これはReplicaSetの部分になりますが、最終的にrsc.queue.Addという箇所で、queueにリソースの情報を入れています。
queueを介して、リソースの現在の状態を把握する仕組みになっていることが分かります。

希望通りになるように処理する

ReplicaSetControllerのRun関数の実行内容です。
queueに登録されるリソースの情報をどのように処理しているかといった部分になります。

func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
...
    for i := 0; i < workers; i++ {
        go wait.Until(rsc.worker, time.Second, stopCh)
    }

    <-stopCh
}

https://github.com/kubernetes/kubernetes/blob/v1.18.3/pkg/controller/replicaset/replica_set.go#L176

go wait.Until(rsc.worker, time.Second, stopCh)でworker関数が複数スレッドで実行されます。

func (rsc *ReplicaSetController) worker() {
    for rsc.processNextWorkItem() {
    }
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
    key, quit := rsc.queue.Get()
    if quit {
        return false
    }
    defer rsc.queue.Done(key)

    err := rsc.syncHandler(key.(string))
    if err == nil {
        rsc.queue.Forget(key)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)

    return true
}

https://github.com/kubernetes/kubernetes/blob/v1.18.3/pkg/controller/replicaset/replica_set.go#L517

worker関数で実行しているprocessNextWorkItem関数は無限ループになっています。
この部分が希望通りになるまで動き続けるControllerのループ処理です。
rsc.queue.Get()で前述のqueueからリソースの情報を取得して、実際のControllerの処理(rsc.syncHandler関数)を実行しています。
希望通りになるように処理しているのは、この関数内で実装されています。つまり、ReplicaSet Controllerの実装に任されています。
ReplicaSet Controllerの場合はsyncReplicaSet関数、manageReplicas関数になります。

https://github.com/kubernetes/kubernetes/blob/v1.18.3/pkg/controller/replicaset/replica_set.go#L647

https://github.com/kubernetes/kubernetes/blob/v1.18.3/pkg/controller/replicaset/replica_set.go#L544

ここまでの内容でControllerがどのように動いているかが何となく分かったのではないでしょうか。
Deployment Controllerにおいてもinformerによるqueueへの登録とqueueを処理し続けるループ処理というベースの処理は変わりません。

Reconciliation Loopがどう活用されているか

Kubernetesでは、Kubernetesを拡張する目的で、利用者が独自のControllerを作成できるようになっています。
独自のControllerを作る場合においても今回紹介した仕組みで、Reconciliation Loopを前提としたプログラムを利用者が作ることになります。
その公式のサンプルがこちらです。

github.com

また、今回紹介した仕組みを含めてControllerを作成するためのフレームワークが提供されています。

github.com

github.com

まとめ

informerによるイベントドリブンなqueueへの登録とqueueを処理し続けるループ処理がControllerのReconciliation Loopを実現している処理です。
この処理が現在のリソースの状態を常に把握しながら、希望に応じた処理を実行していくことを実現しています。
処理内容は個別のControllerの実装に委ねられています。

参考