セキュリティ系の勉強・その他開発メモとか雑談. Twitter, ブログカテゴリ一覧
本ブログはあくまでセキュリティに関する情報共有の一環として作成したものであり,公開されているシステム等に許可なく実行するなど、違法な行為を助長するものではありません.

k8s resource watchする

//

あらまし

PodとかNodeとかリソースの更新を監視したかったです。 (消すコードをぱぱっと持ってきたのでもしかしたら変な部分があるかもしれません。雰囲気でも伝われば幸いです)

雑にやったこと

Server pushってやつで、クライアント側ではfetchで呼び出してデータを受け取る感じです。(streamってやつかと)
まず、echoを使っていてこのメソッドをハンドラとして公開するイメージです。

func (h *WatcherHandler) WatchResources(c echo.Context) error {
    ctx := c.Request().Context()
 
    c.Response().Header().Set(echo.HeaderContentType, echo.MIMEApplicationJSON)
    c.Response().WriteHeader(http.StatusOK)
    // Start to watch and do server push
    err := h.service.WatchResources(ctx, c.Response())

    if err != nil {
        klog.Errorf("closed to watch resources: %+v", err)
        return echo.NewHTTPError(http.StatusInternalServerError)
    }
    return nil
}

肝心のwatchする部分はこんな感じ。もう消すコードだったのでちょっと雑に貼ります、、

type ResponseStream interface {
    io.Writer
    http.Flusher
}

func (s *Service) WatchResources(ctx context.Context, stream ResponseStream) error {
  podsWatcher := cache.NewListWatchFromClient(s.client.CoreV1().RESTClient(), string(Pods), corev1.NamespaceAll, fields.Everything())
    _, podsController := cache.NewInformer(podsWatcher, &corev1.Pod{}, 0, createResourceEventHandlerFuncs(watchEventQ, Pods))
  go podsController.Run(ctx.Done())
  <-ctx.Done()
  return xerrors.Errorf("terminated to watch: %v", ctx.Err())
}

func createResourceEventHandlerFuncs(weq *watchEventQueue, r resourceKind) cache.ResourceEventHandlerFuncs {
    return cache.ResourceEventHandlerFuncs{
        AddFunc:    AddedEventHandler(weq, r),
        UpdateFunc: ModifiedEventHandler(weq, r),
        DeleteFunc: DeletedEventHandler(weq, r),
    }
}

func AddedEventHandler(weq *watchEventQueue, r resourceKind) func(obj interface{}) {
    return func(obj interface{}) {
        // ここで受け取ったイベントを処理する
    }
}
...

server pushする部分はこんな感じ

func watchAndFlushQueue(obj interface{}, stream ResponseStream) error {
    enc := json.NewEncoder(stream)
    for {
        if weq.IsEmpty() {
            continue
        }
        if err := enc.Encode(obj); err != nil {
            return xerrors.Errorf("encode a WatchEvent in the queue: %w", err)
        }
        stream.Flush() // これでクライアントにpushされる
    }
}

このコードを使わなかった訳

cache.NewInformerの部分では「list呼び出し→watch監視開始」と実行されます。これは、resourceVersionフィールドの値をlistで取得してwatchでそれを使うことで、無駄な昔の情報をwatchで取得しないようにとかに使います。またこのメソッドで生成されるやつは、watch接続が途切れた場合に、自動で復帰もしてくれる(はず)の優しいやつです。
それが故に、この一連の動作を再度呼び出した時に再度listから実行してしまうことが予想されます。これは今回の私の目的にあっていませんでした。また、resourceVersionフィールドは外側から指定することができず完全に隠されているので、クライアント側からresourceVersionを指定して、好きな位置から情報取得をすることができません。
これらにより、便利ではあるんですが、今回は上記の実装は諦めました。