diff --git a/helm/bundles/cortex-nova/alerts/nova.alerts.yaml b/helm/bundles/cortex-nova/alerts/nova.alerts.yaml index e3271f119..e96dbb48c 100644 --- a/helm/bundles/cortex-nova/alerts/nova.alerts.yaml +++ b/helm/bundles/cortex-nova/alerts/nova.alerts.yaml @@ -609,3 +609,36 @@ groups: failed to find a valid `{{$labels.hvtype}}` host. This may indicate capacity issues, misconfigured filters, or resource constraints in the datacenter. Investigate the affected VMs and hypervisor availability. + + - alert: CortexNovaNewDatasourcesNotReconciling + expr: count by(datasource) (cortex_datasource_seconds_until_reconcile{queued="false",domain="nova"}) > 0 + for: 60m + labels: + context: datasources + dashboard: cortex/cortex + service: cortex + severity: warning + support_group: workload-management + annotations: + summary: "New datasource `{{$labels.datasource}}` has not reconciled" + description: > + A new datasource `{{$labels.datasource}}` has been added but has not + completed its first reconciliation yet. This may indicate issues with + the datasource controller's workqueue overprioritizing other datasources. + + - alert: CortexNovaExistingDatasourcesLackingBehind + expr: sum by(datasource) (cortex_datasource_seconds_until_reconcile{queued="true",domain="nova"}) < -600 + for: 10m + labels: + context: datasources + dashboard: cortex/cortex + service: cortex + severity: warning + support_group: workload-management + annotations: + summary: "Existing datasource `{{$labels.datasource}}` is lacking behind" + description: > + An existing datasource `{{$labels.datasource}}` has been queued for + reconciliation for more than 10 minutes. This may indicate issues with + the datasource controller's workqueue or that this or another datasource + is taking an unusually long time to reconcile. diff --git a/internal/knowledge/kpis/plugins/deployment/datasource_state.go b/internal/knowledge/kpis/plugins/deployment/datasource_state.go index 77f6ab501..a4b65ed79 100644 --- a/internal/knowledge/kpis/plugins/deployment/datasource_state.go +++ b/internal/knowledge/kpis/plugins/deployment/datasource_state.go @@ -5,6 +5,7 @@ package deployment import ( "context" + "time" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/knowledge/db" @@ -12,47 +13,73 @@ import ( "github.com/cobaltcore-dev/cortex/pkg/conf" "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/api/meta" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) +var datasourceStateKPILogger = ctrl.Log.WithName("datasource-state-kpi") + +// DatasourceStateKPIOpts defines the options for the DatasourceStateKPI +// which are loaded through the kpi resource. type DatasourceStateKPIOpts struct { - // The scheduling domain to filter datasources by. + // DatasourceSchedulingDomain describes the scheduling domain to filter + // datasources by. DatasourceSchedulingDomain v1alpha1.SchedulingDomain `json:"datasourceSchedulingDomain"` } -// KPI observing the state of datasource resources managed by cortex. +// DatasourceStateKPI observes the state of datasource resources managed by cortex. type DatasourceStateKPI struct { - // Common base for all KPIs that provides standard functionality. plugins.BaseKPI[DatasourceStateKPIOpts] - // Prometheus descriptor for the datasource state metric. + // Counter that tracks the state of datasources, labeled by domain, + // datasource name, and state. counter *prometheus.Desc + + // gaugeSecondsUntilReconcile is a prometheus gauge that tracks the seconds + // until the datasource should be reconciled again, labeled by domain and + // datasource name. This can help identify if there are issues with the + // reconciliation loop or if the datasource is not being updated as expected. + gaugeSecondsUntilReconcile *prometheus.Desc } +// GetName returns a unique name for this kpi that is used for registration +// and configuration. func (DatasourceStateKPI) GetName() string { return "datasource_state_kpi" } -// Initialize the KPI. +// Init initializes the kpi, e.g. by creating the necessary Prometheus +// descriptors. The base kpi is also initialized with the provided database, +// client and options. func (k *DatasourceStateKPI) Init(db *db.DB, client client.Client, opts conf.RawOpts) error { if err := k.BaseKPI.Init(db, client, opts); err != nil { return err } - k.counter = prometheus.NewDesc( - "cortex_datasource_state", + k.counter = prometheus.NewDesc("cortex_datasource_state", "State of cortex managed datasources", - []string{"domain", "datasource", "state"}, - nil, + []string{"domain", "datasource", "state"}, nil, + ) + k.gaugeSecondsUntilReconcile = prometheus.NewDesc("cortex_datasource_seconds_until_reconcile", + "Seconds until the datasource should be reconciled again. "+ + "Negative values indicate the datasource is x seconds overdue for "+ + "reconciliation.", + []string{"domain", "datasource", "queued"}, nil, ) return nil } -// Conform to the prometheus collector interface by providing the descriptor. -func (k *DatasourceStateKPI) Describe(ch chan<- *prometheus.Desc) { ch <- k.counter } +// Describe sends the descriptor of this kpi to the provided channel. This is +// used by Prometheus to know which metrics this kpi exposes. +func (k *DatasourceStateKPI) Describe(ch chan<- *prometheus.Desc) { + ch <- k.counter + ch <- k.gaugeSecondsUntilReconcile +} -// Collect the datasource state metrics. +// Collect collects the current state of datasources from the database and +// sends it as Prometheus metrics to the provided channel. func (k *DatasourceStateKPI) Collect(ch chan<- prometheus.Metric) { // Get all datasources with the specified datasource operator. datasourceList := &v1alpha1.DatasourceList{} if err := k.Client.List(context.Background(), datasourceList); err != nil { + datasourceStateKPILogger.Error(err, "Failed to list datasources") return } var datasources []v1alpha1.Datasource @@ -75,5 +102,24 @@ func (k *DatasourceStateKPI) Collect(ch chan<- prometheus.Metric) { k.counter, prometheus.GaugeValue, 1, string(k.Options.DatasourceSchedulingDomain), ds.Name, state, ) + if !ds.Status.NextSyncTime.IsZero() { + // This resource is queued and we can calculate the seconds until + // it should be reconciled again (can be negative if in the past). + secondsUntilReconcile := time.Until(ds.Status.NextSyncTime.Time).Seconds() + ch <- prometheus.MustNewConstMetric( + k.gaugeSecondsUntilReconcile, prometheus.GaugeValue, secondsUntilReconcile, + string(k.Options.DatasourceSchedulingDomain), ds.Name, "true", + ) + } else { + // This resource is not queued (never reconciled). In this case + // we take the time since creation as a proxy for how long it has + // been until the first reconciliation request. + secondsSinceCreation := time.Since(ds.CreationTimestamp.Time).Seconds() + ch <- prometheus.MustNewConstMetric( + k.gaugeSecondsUntilReconcile, prometheus.GaugeValue, -secondsSinceCreation, + string(k.Options.DatasourceSchedulingDomain), ds.Name, "false", + ) + } } + datasourceStateKPILogger.Info("Collected datasource state metrics", "count", len(datasources)) } diff --git a/internal/knowledge/kpis/plugins/deployment/datasource_state_test.go b/internal/knowledge/kpis/plugins/deployment/datasource_state_test.go index d75984762..d6e56bd35 100644 --- a/internal/knowledge/kpis/plugins/deployment/datasource_state_test.go +++ b/internal/knowledge/kpis/plugins/deployment/datasource_state_test.go @@ -5,10 +5,12 @@ package deployment import ( "testing" + "time" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/pkg/conf" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -30,7 +32,7 @@ func TestDatasourceStateKPI_Collect(t *testing.T) { name string datasources []v1alpha1.Datasource operator string - expectedCount int + expectedCount int // 2 metrics per datasource: state counter + seconds until reconcile gauge description string }{ { @@ -53,8 +55,8 @@ func TestDatasourceStateKPI_Collect(t *testing.T) { }, }, operator: "test-operator", - expectedCount: 1, - description: "should collect metric for ready datasource", + expectedCount: 2, + description: "should collect metrics for ready datasource", }, { name: "datasource in error state", @@ -75,8 +77,8 @@ func TestDatasourceStateKPI_Collect(t *testing.T) { }, }, operator: "test-operator", - expectedCount: 1, - description: "should collect metric for error datasource", + expectedCount: 2, + description: "should collect metrics for error datasource", }, { name: "multiple datasources different states", @@ -115,7 +117,7 @@ func TestDatasourceStateKPI_Collect(t *testing.T) { }, }, operator: "test-operator", - expectedCount: 3, + expectedCount: 6, description: "should collect metrics for all datasources with different states", }, { @@ -139,7 +141,7 @@ func TestDatasourceStateKPI_Collect(t *testing.T) { }, }, operator: "test-operator", - expectedCount: 1, + expectedCount: 2, description: "should only collect metrics for datasources with matching operator", }, { @@ -155,8 +157,8 @@ func TestDatasourceStateKPI_Collect(t *testing.T) { }, }, operator: "test-operator", - expectedCount: 1, - description: "should collect metric with unknown state for datasource without objects or conditions", + expectedCount: 2, + description: "should collect metrics with unknown state for datasource without objects or conditions", }, } @@ -206,7 +208,7 @@ func TestDatasourceStateKPI_Describe(t *testing.T) { t.Fatalf("expected no error, got %v", err) } - ch := make(chan *prometheus.Desc, 1) + ch := make(chan *prometheus.Desc, 2) kpi.Describe(ch) close(ch) @@ -215,7 +217,113 @@ func TestDatasourceStateKPI_Describe(t *testing.T) { descCount++ } - if descCount != 1 { - t.Errorf("expected 1 descriptor, got %d", descCount) + if descCount != 2 { + t.Errorf("expected 2 descriptors, got %d", descCount) + } +} + +func TestDatasourceStateKPI_GaugeSecondsUntilReconcile(t *testing.T) { + scheme, err := v1alpha1.SchemeBuilder.Build() + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + now := time.Now() + tests := []struct { + name string + datasource v1alpha1.Datasource + expectQueued bool + expectSign int // 1 for positive, -1 for negative + }{ + { + name: "datasource with NextSyncTime in future", + datasource: v1alpha1.Datasource{ + ObjectMeta: v1.ObjectMeta{Name: "ds-queued", CreationTimestamp: v1.NewTime(now.Add(-time.Hour))}, + Spec: v1alpha1.DatasourceSpec{SchedulingDomain: "test-operator"}, + Status: v1alpha1.DatasourceStatus{ + NextSyncTime: v1.NewTime(now.Add(30 * time.Second)), + }, + }, + expectQueued: true, + expectSign: 1, + }, + { + name: "datasource with NextSyncTime in past", + datasource: v1alpha1.Datasource{ + ObjectMeta: v1.ObjectMeta{Name: "ds-overdue", CreationTimestamp: v1.NewTime(now.Add(-time.Hour))}, + Spec: v1alpha1.DatasourceSpec{SchedulingDomain: "test-operator"}, + Status: v1alpha1.DatasourceStatus{ + NextSyncTime: v1.NewTime(now.Add(-30 * time.Second)), + }, + }, + expectQueued: true, + expectSign: -1, + }, + { + name: "datasource never reconciled (no NextSyncTime)", + datasource: v1alpha1.Datasource{ + ObjectMeta: v1.ObjectMeta{Name: "ds-never-reconciled", CreationTimestamp: v1.NewTime(now.Add(-time.Minute))}, + Spec: v1alpha1.DatasourceSpec{SchedulingDomain: "test-operator"}, + Status: v1alpha1.DatasourceStatus{}, + }, + expectQueued: false, + expectSign: -1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(&tt.datasource). + Build() + + kpi := &DatasourceStateKPI{} + if err := kpi.Init(nil, client, conf.NewRawOpts(`{"datasourceSchedulingDomain": "test-operator"}`)); err != nil { + t.Fatalf("expected no error, got %v", err) + } + + ch := make(chan prometheus.Metric, 10) + kpi.Collect(ch) + close(ch) + + var gaugeMetric prometheus.Metric + for m := range ch { + var metric dto.Metric + if err := m.Write(&metric); err != nil { + t.Fatalf("failed to write metric: %v", err) + } + for _, label := range metric.Label { + if label.GetName() == "queued" { + gaugeMetric = m + expectedQueued := "false" + if tt.expectQueued { + expectedQueued = "true" + } + if label.GetValue() != expectedQueued { + t.Errorf("expected queued=%s, got queued=%s", expectedQueued, label.GetValue()) + } + break + } + } + } + + if gaugeMetric == nil { + t.Fatal("expected gaugeSecondsUntilReconcile metric to be collected") + } + + var metric dto.Metric + if err := gaugeMetric.Write(&metric); err != nil { + t.Fatalf("failed to write metric: %v", err) + } + + value := metric.Gauge.GetValue() + if tt.expectSign == 1 && value <= 0 { + t.Errorf("expected positive value, got %f", value) + } + if tt.expectSign == -1 && value >= 0 { + t.Errorf("expected negative value, got %f", value) + } + }) } }