1. qpext

当使用 Knative 部署时,一个 Pod 中通常包含两个容器:

  • queue-proxy:负责流量管理、健康检查、限流、超时和优雅关闭。
  • kserve-container:运行具体的模型预测工作负载。

Prometheus 默认只能从单个端点抓取指标,而当一个 Pod 中有多个容器时,分别暴露的指标端口可能导致监控查询变得复杂。

为了解决这个问题,qpext(Queue Proxy EXTension) 被引入:

  • qpext 扩展了 Knative queue-proxy 的功能,使其可以同时查询(即聚合)来自 queue-proxy 容器和 kserve-container 的 Prometheus 指标,并在一个统一的端点输出聚合后的结果。
  • 这样做可以让用户只需要配置一个指标抓取端点,无需分开抓取多个容器的指标,从而简化监控系统的部署与管理。

2. 主要功能和工作流程

Code

qpext/cmd/qpext/main.go

(1)解析与处理请求超时

在代码中有 getHeaderTimeout 函数,其作用是解析 HTTP 请求头中的超时信息。例如,Prometheus 会通过 X-Prometheus-Scrape-Timeout-Seconds 头传递抓取时间:

func getHeaderTimeout(timeout string) (time.Duration, error) {
	timeoutSeconds, err := strconv.ParseFloat(timeout, 64)
	if err != nil {
		return 0 * time.Second, err
	}
	return time.Duration(timeoutSeconds * 1e9), nil
}

解析得到的时间将用于为抓取请求设置上下文超时。

(2)转发部分请求头

applyHeaders 函数用于将来自原请求中的一些头(例如 “Accept”、“User-Agent”、抓取超时等)添加到新的抓取请求中,这样能保证原请求的部分信息传递下去:

func applyHeaders(into http.Header, from http.Header, keys ...string) {
	for _, key := range keys {
		val := from.Get(key)
		if val != "" {
			into.Set(key, val)
		}
	}
}

(3)获取 Serverless 标签

在 qpext 中,通过 getServerlessLabelVals 获取了一组环境变量(如 SERVING_SERVICE、SERVING_CONFIGURATION、SERVING_REVISION)的值。这些值用于后续在指标中附加标签,以便在 Prometheus 中能按服务、配置和修订版本进行过滤和聚合:

func getServerlessLabelVals() []string {
	var labelValues []string
	for _, envVar := range EnvVars {
		labelValues = append(labelValues, os.Getenv(envVar))
	}
	return labelValues
}

(4)聚合指标

qpext 的核心功能是将两个容器的指标聚合到一个端点上:

  • 它通过 HTTP 客户端发起抓取请求(使用 scrape 函数)分别获取 queue-proxy 的指标和 kserve-container 的指标数据;
  • 对于应用容器抓取到的指标,会调用 scrapeAndWriteAppMetrics 对指标进行预处理,比如:
    • 添加 serverless 标签:使用 addServerlessLabels(将环境变量中获取的标签添到每个指标)。
    • 指标清洗:某些指标类型可能是 UNTYPED,通过 sanitizeMetrics 进行转换为 counter 或 gauge,使得 Prometheus 能正确处理这些指标。
func (sc *ScrapeConfigurations) handleStats(w http.ResponseWriter, r *http.Request) {
	// ...
	if sc.QueueProxyPort != "" {
		queueProxyURL := getURL(sc.QueueProxyPort, sc.QueueProxyPath)
		// 抓取 queue-proxy 的指标
		if queueProxy, queueProxyCancel, _, err = scrape(queueProxyURL, r.Header, sc.logger); err != nil {
			sc.logger.Error("failed scraping queue proxy metrics", zap.Error(err))
		}
	}
	// 如果定义了应用端口,则抓取 kserve-container 的指标
	if sc.AppPort != "" {
		kserveContainerURL := getURL(sc.AppPort, sc.AppPath)
		if application, appCancel, contentType, err = scrape(kserveContainerURL, r.Header, sc.logger); err != nil {
			sc.logger.Error("failed scraping application metrics", zap.Error(err))
		}
	}
	// 设置输出格式为文本
	format := expfmt.FmtText
	w.Header().Set("Content-Type", string(format))
	// 先写入 queue-proxy 的指标,再处理应用指标
	if queueProxy != nil {
		_, err = io.Copy(w, queueProxy)
		if err != nil {
			sc.logger.Error("failed to scraping and writing queue proxy metrics", zap.Error(err))
		}
	}
	if application != nil {
		var parser expfmt.TextParser
		var mfs map[string]*io_prometheus_client.MetricFamily
		mfs, err = parser.TextToMetricFamilies(application)
		if err != nil {
			sc.logger.Error("error converting text to metric families", zap.Error(err))
		}
		if err = scrapeAndWriteAppMetrics(mfs, w, format, sc.logger); err != nil {
			sc.logger.Error("failed scraping and writing metrics", zap.Error(err))
		}
	}
}

在上面这段代码里,就做了下面几件事:

  • 通过 getURL 构造两个抓取 URL(一个用于 queue-proxy,一个用于 kserve-container)。
  • 分别调用 scrape 函数进行 HTTP 请求抓取。
  • 抓取回来后将 queue-proxy 的原始指标直接写入响应,再进一步处理应用指标(添加标签、sanitize 后转换为文本格式)。

(5)启动与服务部署

在 main 函数中,qpext 作为一个独立的小应用启动一个 HTTP 服务器,在特定端口(通常是通过环境变量配置)上暴露聚合后的指标端点。用户或 Prometheus 抓取时,只访问该端点,就能获取到聚合好的指标。

func main() {
	zapLogger := logger.InitializeLogger()
	mux := http.NewServeMux()
	ctx, cancel := context.WithCancel(context.Background())
	aggregateMetricsPort := os.Getenv(QueueProxyAggregatePrometheusMetricsPortEnvVarKey)
	sc := NewScrapeConfigs(
		zapLogger,
		QueueProxyMetricsPort,  // 默认队列代理 metrics 端口
		os.Getenv(KServeContainerPrometheusMetricsPortEnvVarKey), // kserve-container 的 metrics 端口
		os.Getenv(KServeContainerPrometheusMetricsPathEnvVarKey), // 指标 path
	)
	mux.HandleFunc(`/metrics`, sc.handleStats)
	l, err := net.Listen("tcp", fmt.Sprintf(":%v", aggregateMetricsPort))
	// 启动 stats server 和 sharedmain(队列代理主逻辑)
	...
}