Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions helm/bundles/cortex-nova/alerts/nova.alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -609,3 +609,36 @@ groups:
failed to find a valid `{{$labels.hvtype}}` host. This may indicate
capacity issues, misconfigured filters, or resource constraints in the
datacenter. Investigate the affected VMs and hypervisor availability.

- alert: CortexNovaNewDatasourcesNotReconciling
expr: count by(datasource) (cortex_datasource_seconds_until_reconcile{queued="false",domain="nova"}) > 0
for: 60m
labels:
context: datasources
dashboard: cortex/cortex
service: cortex
severity: warning
support_group: workload-management
annotations:
summary: "New datasource `{{$labels.datasource}}` has not reconciled"
description: >
A new datasource `{{$labels.datasource}}` has been added but has not
completed its first reconciliation yet. This may indicate issues with
the datasource controller's workqueue overprioritizing other datasources.

- alert: CortexNovaExistingDatasourcesLackingBehind
expr: sum by(datasource) (cortex_datasource_seconds_until_reconcile{queued="true",domain="nova"}) < -600
for: 10m
labels:
context: datasources
dashboard: cortex/cortex
service: cortex
severity: warning
support_group: workload-management
annotations:
summary: "Existing datasource `{{$labels.datasource}}` is lacking behind"
description: >
An existing datasource `{{$labels.datasource}}` has been queued for
reconciliation for more than 10 minutes. This may indicate issues with
the datasource controller's workqueue or that this or another datasource
is taking an unusually long time to reconcile.
70 changes: 58 additions & 12 deletions internal/knowledge/kpis/plugins/deployment/datasource_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,81 @@ package deployment

import (
"context"
"time"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/knowledge/db"
"github.com/cobaltcore-dev/cortex/internal/knowledge/kpis/plugins"
"github.com/cobaltcore-dev/cortex/pkg/conf"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/api/meta"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var datasourceStateKPILogger = ctrl.Log.WithName("datasource-state-kpi")

// DatasourceStateKPIOpts defines the options for the DatasourceStateKPI
// which are loaded through the kpi resource.
type DatasourceStateKPIOpts struct {
// The scheduling domain to filter datasources by.
// DatasourceSchedulingDomain describes the scheduling domain to filter
// datasources by.
DatasourceSchedulingDomain v1alpha1.SchedulingDomain `json:"datasourceSchedulingDomain"`
}

// KPI observing the state of datasource resources managed by cortex.
// DatasourceStateKPI observes the state of datasource resources managed by cortex.
type DatasourceStateKPI struct {
// Common base for all KPIs that provides standard functionality.
plugins.BaseKPI[DatasourceStateKPIOpts]

// Prometheus descriptor for the datasource state metric.
// Counter that tracks the state of datasources, labeled by domain,
// datasource name, and state.
counter *prometheus.Desc

// gaugeSecondsUntilReconcile is a prometheus gauge that tracks the seconds
// until the datasource should be reconciled again, labeled by domain and
// datasource name. This can help identify if there are issues with the
// reconciliation loop or if the datasource is not being updated as expected.
gaugeSecondsUntilReconcile *prometheus.Desc
}

// GetName returns a unique name for this kpi that is used for registration
// and configuration.
func (DatasourceStateKPI) GetName() string { return "datasource_state_kpi" }

// Initialize the KPI.
// Init initializes the kpi, e.g. by creating the necessary Prometheus
// descriptors. The base kpi is also initialized with the provided database,
// client and options.
func (k *DatasourceStateKPI) Init(db *db.DB, client client.Client, opts conf.RawOpts) error {
if err := k.BaseKPI.Init(db, client, opts); err != nil {
return err
}
k.counter = prometheus.NewDesc(
"cortex_datasource_state",
k.counter = prometheus.NewDesc("cortex_datasource_state",
"State of cortex managed datasources",
[]string{"domain", "datasource", "state"},
nil,
[]string{"domain", "datasource", "state"}, nil,
)
k.gaugeSecondsUntilReconcile = prometheus.NewDesc("cortex_datasource_seconds_until_reconcile",
"Seconds until the datasource should be reconciled again. "+
"Negative values indicate the datasource is x seconds overdue for "+
"reconciliation.",
[]string{"domain", "datasource", "queued"}, nil,
)
return nil
}

// Conform to the prometheus collector interface by providing the descriptor.
func (k *DatasourceStateKPI) Describe(ch chan<- *prometheus.Desc) { ch <- k.counter }
// Describe sends the descriptor of this kpi to the provided channel. This is
// used by Prometheus to know which metrics this kpi exposes.
func (k *DatasourceStateKPI) Describe(ch chan<- *prometheus.Desc) {
ch <- k.counter
ch <- k.gaugeSecondsUntilReconcile
}

// Collect the datasource state metrics.
// Collect collects the current state of datasources from the database and
// sends it as Prometheus metrics to the provided channel.
func (k *DatasourceStateKPI) Collect(ch chan<- prometheus.Metric) {
// Get all datasources with the specified datasource operator.
datasourceList := &v1alpha1.DatasourceList{}
if err := k.Client.List(context.Background(), datasourceList); err != nil {
datasourceStateKPILogger.Error(err, "Failed to list datasources")
return
}
var datasources []v1alpha1.Datasource
Expand All @@ -75,5 +102,24 @@ func (k *DatasourceStateKPI) Collect(ch chan<- prometheus.Metric) {
k.counter, prometheus.GaugeValue, 1,
string(k.Options.DatasourceSchedulingDomain), ds.Name, state,
)
if !ds.Status.NextSyncTime.IsZero() {
// This resource is queued and we can calculate the seconds until
// it should be reconciled again (can be negative if in the past).
secondsUntilReconcile := time.Until(ds.Status.NextSyncTime.Time).Seconds()
ch <- prometheus.MustNewConstMetric(
k.gaugeSecondsUntilReconcile, prometheus.GaugeValue, secondsUntilReconcile,
string(k.Options.DatasourceSchedulingDomain), ds.Name, "true",
)
} else {
// This resource is not queued (never reconciled). In this case
// we take the time since creation as a proxy for how long it has
// been until the first reconciliation request.
secondsSinceCreation := time.Since(ds.CreationTimestamp.Time).Seconds()
ch <- prometheus.MustNewConstMetric(
k.gaugeSecondsUntilReconcile, prometheus.GaugeValue, -secondsSinceCreation,
string(k.Options.DatasourceSchedulingDomain), ds.Name, "false",
)
}
}
datasourceStateKPILogger.Info("Collected datasource state metrics", "count", len(datasources))
}
132 changes: 120 additions & 12 deletions internal/knowledge/kpis/plugins/deployment/datasource_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package deployment

import (
"testing"
"time"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/pkg/conf"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
Expand All @@ -30,7 +32,7 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
name string
datasources []v1alpha1.Datasource
operator string
expectedCount int
expectedCount int // 2 metrics per datasource: state counter + seconds until reconcile gauge
description string
}{
{
Expand All @@ -53,8 +55,8 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
},
},
operator: "test-operator",
expectedCount: 1,
description: "should collect metric for ready datasource",
expectedCount: 2,
description: "should collect metrics for ready datasource",
},
{
name: "datasource in error state",
Expand All @@ -75,8 +77,8 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
},
},
operator: "test-operator",
expectedCount: 1,
description: "should collect metric for error datasource",
expectedCount: 2,
description: "should collect metrics for error datasource",
},
{
name: "multiple datasources different states",
Expand Down Expand Up @@ -115,7 +117,7 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
},
},
operator: "test-operator",
expectedCount: 3,
expectedCount: 6,
description: "should collect metrics for all datasources with different states",
},
{
Expand All @@ -139,7 +141,7 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
},
},
operator: "test-operator",
expectedCount: 1,
expectedCount: 2,
description: "should only collect metrics for datasources with matching operator",
},
{
Expand All @@ -155,8 +157,8 @@ func TestDatasourceStateKPI_Collect(t *testing.T) {
},
},
operator: "test-operator",
expectedCount: 1,
description: "should collect metric with unknown state for datasource without objects or conditions",
expectedCount: 2,
description: "should collect metrics with unknown state for datasource without objects or conditions",
},
}

Expand Down Expand Up @@ -206,7 +208,7 @@ func TestDatasourceStateKPI_Describe(t *testing.T) {
t.Fatalf("expected no error, got %v", err)
}

ch := make(chan *prometheus.Desc, 1)
ch := make(chan *prometheus.Desc, 2)
kpi.Describe(ch)
close(ch)

Expand All @@ -215,7 +217,113 @@ func TestDatasourceStateKPI_Describe(t *testing.T) {
descCount++
}

if descCount != 1 {
t.Errorf("expected 1 descriptor, got %d", descCount)
if descCount != 2 {
t.Errorf("expected 2 descriptors, got %d", descCount)
}
}

func TestDatasourceStateKPI_GaugeSecondsUntilReconcile(t *testing.T) {
scheme, err := v1alpha1.SchemeBuilder.Build()
if err != nil {
t.Fatalf("expected no error, got %v", err)
}

now := time.Now()
tests := []struct {
name string
datasource v1alpha1.Datasource
expectQueued bool
expectSign int // 1 for positive, -1 for negative
}{
{
name: "datasource with NextSyncTime in future",
datasource: v1alpha1.Datasource{
ObjectMeta: v1.ObjectMeta{Name: "ds-queued", CreationTimestamp: v1.NewTime(now.Add(-time.Hour))},
Spec: v1alpha1.DatasourceSpec{SchedulingDomain: "test-operator"},
Status: v1alpha1.DatasourceStatus{
NextSyncTime: v1.NewTime(now.Add(30 * time.Second)),
},
},
expectQueued: true,
expectSign: 1,
},
{
name: "datasource with NextSyncTime in past",
datasource: v1alpha1.Datasource{
ObjectMeta: v1.ObjectMeta{Name: "ds-overdue", CreationTimestamp: v1.NewTime(now.Add(-time.Hour))},
Spec: v1alpha1.DatasourceSpec{SchedulingDomain: "test-operator"},
Status: v1alpha1.DatasourceStatus{
NextSyncTime: v1.NewTime(now.Add(-30 * time.Second)),
},
},
expectQueued: true,
expectSign: -1,
},
{
name: "datasource never reconciled (no NextSyncTime)",
datasource: v1alpha1.Datasource{
ObjectMeta: v1.ObjectMeta{Name: "ds-never-reconciled", CreationTimestamp: v1.NewTime(now.Add(-time.Minute))},
Spec: v1alpha1.DatasourceSpec{SchedulingDomain: "test-operator"},
Status: v1alpha1.DatasourceStatus{},
},
expectQueued: false,
expectSign: -1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(&tt.datasource).
Build()

kpi := &DatasourceStateKPI{}
if err := kpi.Init(nil, client, conf.NewRawOpts(`{"datasourceSchedulingDomain": "test-operator"}`)); err != nil {
t.Fatalf("expected no error, got %v", err)
}

ch := make(chan prometheus.Metric, 10)
kpi.Collect(ch)
close(ch)

var gaugeMetric prometheus.Metric
for m := range ch {
var metric dto.Metric
if err := m.Write(&metric); err != nil {
t.Fatalf("failed to write metric: %v", err)
}
for _, label := range metric.Label {
if label.GetName() == "queued" {
gaugeMetric = m
expectedQueued := "false"
if tt.expectQueued {
expectedQueued = "true"
}
if label.GetValue() != expectedQueued {
t.Errorf("expected queued=%s, got queued=%s", expectedQueued, label.GetValue())
}
break
}
}
}

if gaugeMetric == nil {
t.Fatal("expected gaugeSecondsUntilReconcile metric to be collected")
}

var metric dto.Metric
if err := gaugeMetric.Write(&metric); err != nil {
t.Fatalf("failed to write metric: %v", err)
}

value := metric.Gauge.GetValue()
if tt.expectSign == 1 && value <= 0 {
t.Errorf("expected positive value, got %f", value)
}
if tt.expectSign == -1 && value >= 0 {
t.Errorf("expected negative value, got %f", value)
}
})
}
}
Loading