k8s resource watchする
//
あらまし
PodとかNodeとかリソースの更新を監視したかったです。 (消すコードをぱぱっと持ってきたのでもしかしたら変な部分があるかもしれません。雰囲気でも伝われば幸いです)
雑にやったこと
Server push
ってやつで、クライアント側ではfetchで呼び出してデータを受け取る感じです。(stream
ってやつかと)
まず、echo
を使っていてこのメソッドをハンドラとして公開するイメージです。
func (h *WatcherHandler) WatchResources(c echo.Context) error { ctx := c.Request().Context() c.Response().Header().Set(echo.HeaderContentType, echo.MIMEApplicationJSON) c.Response().WriteHeader(http.StatusOK) // Start to watch and do server push err := h.service.WatchResources(ctx, c.Response()) if err != nil { klog.Errorf("closed to watch resources: %+v", err) return echo.NewHTTPError(http.StatusInternalServerError) } return nil }
肝心のwatchする部分はこんな感じ。もう消すコードだったのでちょっと雑に貼ります、、
type ResponseStream interface { io.Writer http.Flusher } func (s *Service) WatchResources(ctx context.Context, stream ResponseStream) error { podsWatcher := cache.NewListWatchFromClient(s.client.CoreV1().RESTClient(), string(Pods), corev1.NamespaceAll, fields.Everything()) _, podsController := cache.NewInformer(podsWatcher, &corev1.Pod{}, 0, createResourceEventHandlerFuncs(watchEventQ, Pods)) go podsController.Run(ctx.Done()) <-ctx.Done() return xerrors.Errorf("terminated to watch: %v", ctx.Err()) } func createResourceEventHandlerFuncs(weq *watchEventQueue, r resourceKind) cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: AddedEventHandler(weq, r), UpdateFunc: ModifiedEventHandler(weq, r), DeleteFunc: DeletedEventHandler(weq, r), } } func AddedEventHandler(weq *watchEventQueue, r resourceKind) func(obj interface{}) { return func(obj interface{}) { // ここで受け取ったイベントを処理する } } ...
server push
する部分はこんな感じ
func watchAndFlushQueue(obj interface{}, stream ResponseStream) error { enc := json.NewEncoder(stream) for { if weq.IsEmpty() { continue } if err := enc.Encode(obj); err != nil { return xerrors.Errorf("encode a WatchEvent in the queue: %w", err) } stream.Flush() // これでクライアントにpushされる } }
このコードを使わなかった訳
cache.NewInformer
の部分では「list呼び出し→watch監視開始」と実行されます。これは、resourceVersion
フィールドの値をlistで取得してwatchでそれを使うことで、無駄な昔の情報をwatchで取得しないようにとかに使います。またこのメソッドで生成されるやつは、watch接続が途切れた場合に、自動で復帰もしてくれる(はず)の優しいやつです。
それが故に、この一連の動作を再度呼び出した時に再度listから実行してしまうことが予想されます。これは今回の私の目的にあっていませんでした。また、resourceVersion
フィールドは外側から指定することができず完全に隠されているので、クライアント側からresourceVersion
を指定して、好きな位置から情報取得をすることができません。
これらにより、便利ではあるんですが、今回は上記の実装は諦めました。