Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions .github/workflows/docker-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,97 @@ jobs:
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}

update-infra:
name: Update CSFX-Infra versions.nix
runs-on: ubuntu-latest
needs: [prepare, manifest, build-binaries, attach-binaries-release]
if: needs.prepare.outputs.is_release == 'true'
steps:
- uses: actions/checkout@v4
with:
repository: ${{ github.repository_owner }}/CSFX-Infra
token: ${{ secrets.INFRA_REPO_TOKEN }}
path: infra

- uses: actions/download-artifact@v4
with:
pattern: digest-*
path: /tmp/digests
merge-multiple: true

- uses: actions/download-artifact@v4
with:
pattern: csf-agent-*
path: /tmp/binaries
merge-multiple: true

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Log in to GHCR
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Write versions.nix
run: |
VERSION="${{ needs.prepare.outputs.version }}"
ORG=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]')
REPO="${{ github.repository }}"
RELEASE_BASE="https://github.com/${REPO}/releases/download/v${VERSION}"

get_manifest_digest() {
local svc=$1
local image="ghcr.io/${ORG}/csf-ce-${svc}:${VERSION}"
docker buildx imagetools inspect "${image}" \
--format '{{json .Manifest}}' | jq -r '.digest'
}

get_sha256() {
local binary=$1
local arch=$2
awk '{print $1}' /tmp/binaries/${binary}-${arch}.sha256 2>/dev/null
}

cat > infra/versions.nix <<EOF
{
csf = {
version = "${VERSION}";
images = {
api-gateway = { digest = "$(get_manifest_digest api-gateway)"; };
registry = { digest = "$(get_manifest_digest registry)"; };
scheduler = { digest = "$(get_manifest_digest scheduler)"; };
volume-manager = { digest = "$(get_manifest_digest volume-manager)"; };
failover-controller = { digest = "$(get_manifest_digest failover-controller)"; };
sdn-controller = { digest = "$(get_manifest_digest sdn-controller)"; };
};
agent = {
amd64 = {
url = "${RELEASE_BASE}/csf-agent-amd64";
sha256 = "$(get_sha256 csf-agent amd64)";
};
arm64 = {
url = "${RELEASE_BASE}/csf-agent-arm64";
sha256 = "$(get_sha256 csf-agent arm64)";
};
};
};
}
EOF

- name: Commit and push versions.nix
run: |
VERSION="${{ needs.prepare.outputs.version }}"
cd infra
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
git add versions.nix
git diff --cached --quiet && echo "no changes" && exit 0
git commit -m "chore: update versions.nix for v${VERSION}"
git push

summary:
name: Summary
runs-on: ubuntu-latest
Expand Down
84 changes: 84 additions & 0 deletions .github/workflows/prerelease.yml
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,87 @@ jobs:
csf-agent-arm64.sha256
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}

update-infra:
name: Update CSFX-Infra versions.nix
runs-on: ubuntu-latest
needs: [version, manifest, build-binaries, github-release]
steps:
- uses: actions/checkout@v4
with:
repository: ${{ github.repository_owner }}/CSFX-Infra
token: ${{ secrets.INFRA_REPO_TOKEN }}
path: infra

- uses: actions/download-artifact@v4
with:
pattern: csf-agent-*
path: /tmp/binaries
merge-multiple: true

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Log in to GHCR
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Write versions.nix
run: |
VERSION="${{ needs.version.outputs.version }}"
ORG=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]')
REPO="${{ github.repository }}"
RELEASE_BASE="https://github.com/${REPO}/releases/download/v${VERSION}"

get_manifest_digest() {
local svc=$1
local image="ghcr.io/${ORG}/csf-ce-${svc}:${VERSION}"
docker buildx imagetools inspect "${image}" \
--format '{{json .Manifest}}' | jq -r '.digest'
}

get_sha256() {
local binary=$1
local arch=$2
awk '{print $1}' /tmp/binaries/${binary}-${arch}.sha256 2>/dev/null
}

cat > infra/versions.nix <<EOF
{
csf = {
version = "${VERSION}";
images = {
api-gateway = { digest = "$(get_manifest_digest api-gateway)"; };
registry = { digest = "$(get_manifest_digest registry)"; };
scheduler = { digest = "$(get_manifest_digest scheduler)"; };
volume-manager = { digest = "$(get_manifest_digest volume-manager)"; };
failover-controller = { digest = "$(get_manifest_digest failover-controller)"; };
sdn-controller = { digest = "$(get_manifest_digest sdn-controller)"; };
};
agent = {
amd64 = {
url = "${RELEASE_BASE}/csf-agent-amd64";
sha256 = "$(get_sha256 csf-agent amd64)";
};
arm64 = {
url = "${RELEASE_BASE}/csf-agent-arm64";
sha256 = "$(get_sha256 csf-agent arm64)";
};
};
};
}
EOF

- name: Commit and push versions.nix
run: |
VERSION="${{ needs.version.outputs.version }}"
cd infra
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
git add versions.nix
git diff --cached --quiet && echo "no changes" && exit 0
git commit -m "chore: update versions.nix for v${VERSION}"
git push
4 changes: 1 addition & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions agent/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ pub struct ContainerStatus {
pub status: String,
}

#[derive(Debug, Deserialize)]
pub struct HeartbeatResponse {
pub desired_flake_rev: Option<String>,
pub post_update_heartbeats: Option<u32>,
}

#[derive(Debug, Deserialize)]
pub struct AssignedWorkload {
pub id: String,
Expand Down Expand Up @@ -143,7 +149,7 @@ impl ApiClient {
api_key: &str,
container_statuses: Option<Vec<ContainerStatus>>,
metrics: Option<crate::system::SystemMetrics>,
) -> Result<()> {
) -> Result<HeartbeatResponse> {
let url = format!(
"{}/api/registry/agents/{}/heartbeat",
self.gateway_url, agent_id
Expand Down Expand Up @@ -189,7 +195,9 @@ impl ApiClient {
anyhow::bail!("Heartbeat failed status={}", status);
}

Ok(())
resp.json::<HeartbeatResponse>()
.await
.context("Failed to parse heartbeat response")
}

pub async fn fetch_assigned_workloads(
Expand Down
17 changes: 16 additions & 1 deletion agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod docker;
mod pki;
mod rbd;
mod system;
mod update_watch;

use anyhow::{Context, Result};
use std::collections::HashMap;
Expand Down Expand Up @@ -169,6 +170,7 @@ async fn run_heartbeat_loop(
) {
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
let mut failure_count: u32 = 0;
let mut current_flake_rev = String::new();

loop {
tokio::select! {
Expand All @@ -183,11 +185,24 @@ async fn run_heartbeat_loop(
let metrics = system::collect_metrics();

match client.heartbeat(agent_id, api_key, Some(statuses), Some(metrics)).await {
Ok(_) => {
Ok(resp) => {
if failure_count > 0 {
info!(agent_id = %agent_id, "Heartbeat recovered after {} failures", failure_count);
failure_count = 0;
}

if let Some(count) = resp.post_update_heartbeats {
update_watch::write_heartbeat_counter(count).await;
}

if let Some(rev) = resp.desired_flake_rev {
let rev_clone = rev.clone();
let current = current_flake_rev.clone();
tokio::spawn(async move {
update_watch::handle(agent_id, &rev_clone, &current).await;
});
current_flake_rev = rev;
}
}
Err(e) => {
failure_count += 1;
Expand Down
59 changes: 59 additions & 0 deletions agent/src/update_watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::time::Duration;
use tokio::fs;
use tracing::{info, warn};
use uuid::Uuid;

const TRIGGER_FILE: &str = "/var/lib/csf/update_trigger";
const HEARTBEAT_COUNTER_FILE: &str = "/var/lib/csf/post_update_heartbeats";
const MAX_JITTER_SECS: u64 = 300;

pub async fn handle(agent_id: Uuid, desired_flake_rev: &str, current_flake_rev: &str) {
if desired_flake_rev == current_flake_rev {
return;
}

if !is_valid_sha(desired_flake_rev) {
warn!(flake_rev = %desired_flake_rev, "received invalid flake rev in heartbeat response");
return;
}

let jitter = jitter_delay(agent_id);
info!(
flake_rev = %desired_flake_rev,
jitter_secs = jitter,
"update signal received, waiting before writing trigger"
);

tokio::time::sleep(Duration::from_secs(jitter)).await;

if let Err(e) = write_trigger(desired_flake_rev).await {
warn!(error = %e, flake_rev = %desired_flake_rev, "failed to write update trigger file");
} else {
info!(flake_rev = %desired_flake_rev, "update trigger written");
}
}

pub async fn write_heartbeat_counter(count: u32) {
if let Some(parent) = std::path::Path::new(HEARTBEAT_COUNTER_FILE).parent() {
let _ = fs::create_dir_all(parent).await;
}
let _ = fs::write(HEARTBEAT_COUNTER_FILE, count.to_string()).await;
}

async fn write_trigger(flake_rev: &str) -> anyhow::Result<()> {
if let Some(parent) = std::path::Path::new(TRIGGER_FILE).parent() {
fs::create_dir_all(parent).await?;
}
fs::write(TRIGGER_FILE, flake_rev).await?;
Ok(())
}

fn jitter_delay(agent_id: Uuid) -> u64 {
let bytes = agent_id.as_bytes();
let val = u64::from_le_bytes(bytes[..8].try_into().unwrap_or([0u8; 8]));
val % MAX_JITTER_SECS
}

fn is_valid_sha(rev: &str) -> bool {
rev.len() == 40 && rev.chars().all(|c| c.is_ascii_hexdigit())
}
Loading
Loading