From 2cf028696127737bc2218049bf722dbf5f2040f4 Mon Sep 17 00:00:00 2001
From: Philipp Matthes
Date: Tue, 31 Mar 2026 08:51:06 +0200
Subject: [PATCH 1/2] Monitor datasource recency and alert on delays
---
.../plugins/deployment/datasource_state.go | 70 ++++++++--
.../deployment/datasource_state_test.go | 132 ++++++++++++++++--
2 files changed, 178 insertions(+), 24 deletions(-)
diff --git a/internal/knowledge/kpis/plugins/deployment/datasource_state.go b/internal/knowledge/kpis/plugins/deployment/datasource_state.go
index 77f6ab501..a4b65ed79 100644
--- a/internal/knowledge/kpis/plugins/deployment/datasource_state.go
+++ b/internal/knowledge/kpis/plugins/deployment/datasource_state.go
@@ -5,6 +5,7 @@ package deployment
import (
"context"
+ "time"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/knowledge/db"
@@ -12,47 +13,73 @@ import (
"github.com/cobaltcore-dev/cortex/pkg/conf"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/api/meta"
+ ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
+var datasourceStateKPILogger = ctrl.Log.WithName("datasource-state-kpi")
+
+// DatasourceStateKPIOpts defines the options for the DatasourceStateKPI
+// which are loaded through the kpi resource.
type DatasourceStateKPIOpts struct {
- // The scheduling domain to filter datasources by.
+ // DatasourceSchedulingDomain describes the scheduling domain to filter
+ // datasources by.
DatasourceSchedulingDomain v1alpha1.SchedulingDomain `json:"datasourceSchedulingDomain"`
}
-// KPI observing the state of datasource resources managed by cortex.
+// DatasourceStateKPI observes the state of datasource resources managed by cortex.
type DatasourceStateKPI struct {
- // Common base for all KPIs that provides standard functionality.
plugins.BaseKPI[DatasourceStateKPIOpts]
- // Prometheus descriptor for the datasource state metric.
+ // Counter that tracks the state of datasources, labeled by domain,
+ // datasource name, and state.
counter *prometheus.Desc
+
+ // gaugeSecondsUntilReconcile is a prometheus gauge that tracks the seconds
+ // until the datasource should be reconciled again, labeled by domain and
+ // datasource name. This can help identify if there are issues with the
+ // reconciliation loop or if the datasource is not being updated as expected.
+ gaugeSecondsUntilReconcile *prometheus.Desc
}
+// GetName returns a unique name for this kpi that is used for registration
+// and configuration.
func (DatasourceStateKPI) GetName() string { return "datasource_state_kpi" }
-// Initialize the KPI.
+// Init initializes the kpi, e.g. by creating the necessary Prometheus
+// descriptors. The base kpi is also initialized with the provided database,
+// client and options.
func (k *DatasourceStateKPI) Init(db *db.DB, client client.Client, opts conf.RawOpts) error {
if err := k.BaseKPI.Init(db, client, opts); err != nil {
return err
}
- k.counter = prometheus.NewDesc(
- "cortex_datasource_state",
+ k.counter = prometheus.NewDesc("cortex_datasource_state",
"State of cortex managed datasources",
- []string{"domain", "datasource", "state"},
- nil,
+ []string{"domain", "datasource", "state"}, nil,
+ )
+ k.gaugeSecondsUntilReconcile = prometheus.NewDesc("cortex_datasource_seconds_until_reconcile",
+ "Seconds until the datasource should be reconciled again. "+
+ "Negative values indicate the datasource is x seconds overdue for "+
+ "reconciliation.",
+ []string{"domain", "datasource", "queued"}, nil,
)
return nil
}
-// Conform to the prometheus collector interface by providing the descriptor.
-func (k *DatasourceStateKPI) Describe(ch chan<- *prometheus.Desc) { ch <- k.counter }
+// Describe sends the descriptor of this kpi to the provided channel. This is
+// used by Prometheus to know which metrics this kpi exposes.
+func (k *DatasourceStateKPI) Describe(ch chan<- *prometheus.Desc) {
+ ch <- k.counter
+ ch <- k.gaugeSecondsUntilReconcile
+}
-// Collect the datasource state metrics.
+// Collect collects the current state of datasources from the database and
+// sends it as Prometheus metrics to the provided channel.
func (k *DatasourceStateKPI) Collect(ch chan<- prometheus.Metric) {
// Get all datasources with the specified datasource operator.
datasourceList := &v1alpha1.DatasourceList{}
if err := k.Client.List(context.Background(), datasourceList); err != nil {
+ datasourceStateKPILogger.Error(err, "Failed to list datasources")
return
}
var datasources []v1alpha1.Datasource
@@ -75,5 +102,24 @@ func (k *DatasourceStateKPI) Collect(ch chan<- prometheus.Metric) {
k.counter, prometheus.GaugeValue, 1,
string(k.Options.DatasourceSchedulingDomain), ds.Name, state,
)
+ if !ds.Status.NextSyncTime.IsZero() {
+ // This resource is queued and we can calculate the seconds until
+ // it should be reconciled again (can be negative if in the past).
+ secondsUntilReconcile := time.Until(ds.Status.NextSyncTime.Time).Seconds()
+ ch <- prometheus.MustNewConstMetric(
+ k.gaugeSecondsUntilReconcile, prometheus.GaugeValue, secondsUntilReconcile,
+ string(k.Options.DatasourceSchedulingDomain), ds.Name, "true",
+ )
+ } else {
+ // This resource is not queued (never reconciled). In this case
+ // we take the time since creation as a proxy for how long it has
+ // been until the first reconciliation request.
+ secondsSinceCreation := time.Since(ds.CreationTimestamp.Time).Seconds()
+ ch <- prometheus.MustNewConstMetric(
+ k.gaugeSecondsUntilReconcile, prometheus.GaugeValue, -secondsSinceCreation,
+ string(k.Options.DatasourceSchedulingDomain), ds.Name, "false",
+ )
+ }
}
+ datasourceStateKPILogger.Info("Collected datasource state metrics", "count", len(datasources))
}
diff --git a/internal/knowledge/kpis/plugins/deployment/datasource_state_test.go b/internal/knowledge/kpis/plugins/deployment/datasource_state_test.go
index d75984762..d6e56bd35 100644
--- a/internal/knowledge/kpis/plugins/deployment/datasource_state_test.go
+++ b/internal/knowledge/kpis/plugins/deployment/datasource_state_test.go
@@ -5,10 +5,12 @@ package deployment
import (
"testing"
+ "time"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/pkg/conf"
"github.com/prometheus/client_golang/prometheus"
+ dto "github.com/prometheus/client_model/go"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
@@ -30,7 +32,7 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
name string
datasources []v1alpha1.Datasource
operator string
- expectedCount int
+ expectedCount int // 2 metrics per datasource: state counter + seconds until reconcile gauge
description string
}{
{
@@ -53,8 +55,8 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
},
},
operator: "test-operator",
- expectedCount: 1,
- description: "should collect metric for ready datasource",
+ expectedCount: 2,
+ description: "should collect metrics for ready datasource",
},
{
name: "datasource in error state",
@@ -75,8 +77,8 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
},
},
operator: "test-operator",
- expectedCount: 1,
- description: "should collect metric for error datasource",
+ expectedCount: 2,
+ description: "should collect metrics for error datasource",
},
{
name: "multiple datasources different states",
@@ -115,7 +117,7 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
},
},
operator: "test-operator",
- expectedCount: 3,
+ expectedCount: 6,
description: "should collect metrics for all datasources with different states",
},
{
@@ -139,7 +141,7 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
},
},
operator: "test-operator",
- expectedCount: 1,
+ expectedCount: 2,
description: "should only collect metrics for datasources with matching operator",
},
{
@@ -155,8 +157,8 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
},
},
operator: "test-operator",
- expectedCount: 1,
- description: "should collect metric with unknown state for datasource without objects or conditions",
+ expectedCount: 2,
+ description: "should collect metrics with unknown state for datasource without objects or conditions",
},
}
@@ -206,7 +208,7 @@ func TestDatasourceStateKPI_Describe(t *testing.T) {
t.Fatalf("expected no error, got %v", err)
}
- ch := make(chan *prometheus.Desc, 1)
+ ch := make(chan *prometheus.Desc, 2)
kpi.Describe(ch)
close(ch)
@@ -215,7 +217,113 @@ func TestDatasourceStateKPI_Describe(t *testing.T) {
descCount++
}
- if descCount != 1 {
- t.Errorf("expected 1 descriptor, got %d", descCount)
+ if descCount != 2 {
+ t.Errorf("expected 2 descriptors, got %d", descCount)
+ }
+}
+
+func TestDatasourceStateKPI_GaugeSecondsUntilReconcile(t *testing.T) {
+ scheme, err := v1alpha1.SchemeBuilder.Build()
+ if err != nil {
+ t.Fatalf("expected no error, got %v", err)
+ }
+
+ now := time.Now()
+ tests := []struct {
+ name string
+ datasource v1alpha1.Datasource
+ expectQueued bool
+ expectSign int // 1 for positive, -1 for negative
+ }{
+ {
+ name: "datasource with NextSyncTime in future",
+ datasource: v1alpha1.Datasource{
+ ObjectMeta: v1.ObjectMeta{Name: "ds-queued", CreationTimestamp: v1.NewTime(now.Add(-time.Hour))},
+ Spec: v1alpha1.DatasourceSpec{SchedulingDomain: "test-operator"},
+ Status: v1alpha1.DatasourceStatus{
+ NextSyncTime: v1.NewTime(now.Add(30 * time.Second)),
+ },
+ },
+ expectQueued: true,
+ expectSign: 1,
+ },
+ {
+ name: "datasource with NextSyncTime in past",
+ datasource: v1alpha1.Datasource{
+ ObjectMeta: v1.ObjectMeta{Name: "ds-overdue", CreationTimestamp: v1.NewTime(now.Add(-time.Hour))},
+ Spec: v1alpha1.DatasourceSpec{SchedulingDomain: "test-operator"},
+ Status: v1alpha1.DatasourceStatus{
+ NextSyncTime: v1.NewTime(now.Add(-30 * time.Second)),
+ },
+ },
+ expectQueued: true,
+ expectSign: -1,
+ },
+ {
+ name: "datasource never reconciled (no NextSyncTime)",
+ datasource: v1alpha1.Datasource{
+ ObjectMeta: v1.ObjectMeta{Name: "ds-never-reconciled", CreationTimestamp: v1.NewTime(now.Add(-time.Minute))},
+ Spec: v1alpha1.DatasourceSpec{SchedulingDomain: "test-operator"},
+ Status: v1alpha1.DatasourceStatus{},
+ },
+ expectQueued: false,
+ expectSign: -1,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ client := fake.NewClientBuilder().
+ WithScheme(scheme).
+ WithObjects(&tt.datasource).
+ Build()
+
+ kpi := &DatasourceStateKPI{}
+ if err := kpi.Init(nil, client, conf.NewRawOpts(`{"datasourceSchedulingDomain": "test-operator"}`)); err != nil {
+ t.Fatalf("expected no error, got %v", err)
+ }
+
+ ch := make(chan prometheus.Metric, 10)
+ kpi.Collect(ch)
+ close(ch)
+
+ var gaugeMetric prometheus.Metric
+ for m := range ch {
+ var metric dto.Metric
+ if err := m.Write(&metric); err != nil {
+ t.Fatalf("failed to write metric: %v", err)
+ }
+ for _, label := range metric.Label {
+ if label.GetName() == "queued" {
+ gaugeMetric = m
+ expectedQueued := "false"
+ if tt.expectQueued {
+ expectedQueued = "true"
+ }
+ if label.GetValue() != expectedQueued {
+ t.Errorf("expected queued=%s, got queued=%s", expectedQueued, label.GetValue())
+ }
+ break
+ }
+ }
+ }
+
+ if gaugeMetric == nil {
+ t.Fatal("expected gaugeSecondsUntilReconcile metric to be collected")
+ }
+
+ var metric dto.Metric
+ if err := gaugeMetric.Write(&metric); err != nil {
+ t.Fatalf("failed to write metric: %v", err)
+ }
+
+ value := metric.Gauge.GetValue()
+ if tt.expectSign == 1 && value <= 0 {
+ t.Errorf("expected positive value, got %f", value)
+ }
+ if tt.expectSign == -1 && value >= 0 {
+ t.Errorf("expected negative value, got %f", value)
+ }
+ })
}
}
From 95d4bf442fd93298cd084022b3760d6e3eacbac9 Mon Sep 17 00:00:00 2001
From: Philipp Matthes
Date: Tue, 31 Mar 2026 08:51:51 +0200
Subject: [PATCH 2/2] Add alerts in separate commit to avoid merge conflict
---
.../cortex-nova/alerts/nova.alerts.yaml | 33 +++++++++++++++++++
1 file changed, 33 insertions(+)
diff --git a/helm/bundles/cortex-nova/alerts/nova.alerts.yaml b/helm/bundles/cortex-nova/alerts/nova.alerts.yaml
index e3271f119..e96dbb48c 100644
--- a/helm/bundles/cortex-nova/alerts/nova.alerts.yaml
+++ b/helm/bundles/cortex-nova/alerts/nova.alerts.yaml
@@ -609,3 +609,36 @@ groups:
failed to find a valid `{{$labels.hvtype}}` host. This may indicate
capacity issues, misconfigured filters, or resource constraints in the
datacenter. Investigate the affected VMs and hypervisor availability.
+
+ - alert: CortexNovaNewDatasourcesNotReconciling
+ expr: count by(datasource) (cortex_datasource_seconds_until_reconcile{queued="false",domain="nova"}) > 0
+ for: 60m
+ labels:
+ context: datasources
+ dashboard: cortex/cortex
+ service: cortex
+ severity: warning
+ support_group: workload-management
+ annotations:
+ summary: "New datasource `{{$labels.datasource}}` has not reconciled"
+ description: >
+ A new datasource `{{$labels.datasource}}` has been added but has not
+ completed its first reconciliation yet. This may indicate issues with
+ the datasource controller's workqueue overprioritizing other datasources.
+
+ - alert: CortexNovaExistingDatasourcesLackingBehind
+ expr: sum by(datasource) (cortex_datasource_seconds_until_reconcile{queued="true",domain="nova"}) < -600
+ for: 10m
+ labels:
+ context: datasources
+ dashboard: cortex/cortex
+ service: cortex
+ severity: warning
+ support_group: workload-management
+ annotations:
+ summary: "Existing datasource `{{$labels.datasource}}` is lacking behind"
+ description: >
+ An existing datasource `{{$labels.datasource}}` has been queued for
+ reconciliation for more than 10 minutes. This may indicate issues with
+ the datasource controller's workqueue or that this or another datasource
+ is taking an unusually long time to reconcile.