Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 38 additions & 38 deletions .github/workflows/build_image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ jobs:
find target/aarch64-unknown-linux-gnu/release/deps -name "*.rlib" -not -name "libvector*.rlib" -delete 2>/dev/null || true
find target/aarch64-unknown-linux-gnu/release/build -type f -name "*.o" -delete 2>/dev/null || true

- name: Build armv7 binary (standard)
timeout-minutes: 90
env:
CARGO_PROFILE_RELEASE_CODEGEN_UNITS: 16
CARGO_PROFILE_RELEASE_LTO: "thin"
CARGO_BUILD_JOBS: 4
CARGO_INCREMENTAL: 0
run: |
echo "Starting armv7 build at $(date)"
make build-armv7-unknown-linux-gnueabihf
echo "Finished armv7 build at $(date)"
# Clean up intermediate files to save disk space
find target/armv7-unknown-linux-gnueabihf/release/deps -name "*.rlib" -not -name "libvector*.rlib" -delete 2>/dev/null || true
find target/armv7-unknown-linux-gnueabihf/release/build -type f -name "*.o" -delete 2>/dev/null || true
# - name: Build armv7 binary (standard)
# timeout-minutes: 90
# env:
# CARGO_PROFILE_RELEASE_CODEGEN_UNITS: 16
# CARGO_PROFILE_RELEASE_LTO: "thin"
# CARGO_BUILD_JOBS: 4
# CARGO_INCREMENTAL: 0
# run: |
# echo "Starting armv7 build at $(date)"
# make build-armv7-unknown-linux-gnueabihf
# echo "Finished armv7 build at $(date)"
# # Clean up intermediate files to save disk space
# find target/armv7-unknown-linux-gnueabihf/release/deps -name "*.rlib" -not -name "libvector*.rlib" -delete 2>/dev/null || true
# find target/armv7-unknown-linux-gnueabihf/release/build -type f -name "*.o" -delete 2>/dev/null || true

- name: Build and push standard image
env:
Expand Down Expand Up @@ -177,20 +177,20 @@ jobs:
find target/aarch64-unknown-linux-gnu/release/deps -name "*.rlib" -not -name "libvector*.rlib" -delete 2>/dev/null || true
find target/aarch64-unknown-linux-gnu/release/build -type f -name "*.o" -delete 2>/dev/null || true

- name: Build armv7 binary (nextgen)
timeout-minutes: 90
env:
CARGO_PROFILE_RELEASE_CODEGEN_UNITS: 16
CARGO_PROFILE_RELEASE_LTO: "thin"
CARGO_BUILD_JOBS: 4
CARGO_INCREMENTAL: 0
run: |
echo "Starting armv7 nextgen build at $(date)"
make build-armv7-unknown-linux-gnueabihf-nextgen
echo "Finished armv7 nextgen build at $(date)"
# Clean up intermediate files to save disk space
find target/armv7-unknown-linux-gnueabihf/release/deps -name "*.rlib" -not -name "libvector*.rlib" -delete 2>/dev/null || true
find target/armv7-unknown-linux-gnueabihf/release/build -type f -name "*.o" -delete 2>/dev/null || true
# - name: Build armv7 binary (nextgen)
# timeout-minutes: 90
# env:
# CARGO_PROFILE_RELEASE_CODEGEN_UNITS: 16
# CARGO_PROFILE_RELEASE_LTO: "thin"
# CARGO_BUILD_JOBS: 4
# CARGO_INCREMENTAL: 0
# run: |
# echo "Starting armv7 nextgen build at $(date)"
# make build-armv7-unknown-linux-gnueabihf-nextgen
# echo "Finished armv7 nextgen build at $(date)"
# # Clean up intermediate files to save disk space
# find target/armv7-unknown-linux-gnueabihf/release/deps -name "*.rlib" -not -name "libvector*.rlib" -delete 2>/dev/null || true
# find target/armv7-unknown-linux-gnueabihf/release/build -type f -name "*.o" -delete 2>/dev/null || true

- name: Check nextgen binaries before building image
run: |
Expand All @@ -211,18 +211,18 @@ jobs:
else
echo " ❌ NOT FOUND"
fi
echo ""
echo "armv7 binary:"
if [ -f target/armv7-unknown-linux-gnueabihf/release/vector-nextgen ]; then
ls -lh target/armv7-unknown-linux-gnueabihf/release/vector-nextgen
echo " ✅ EXISTS"
else
echo " ❌ NOT FOUND"
fi
# echo ""
# echo "armv7 binary:"
# if [ -f target/armv7-unknown-linux-gnueabihf/release/vector-nextgen ]; then
# ls -lh target/armv7-unknown-linux-gnueabihf/release/vector-nextgen
# echo " ✅ EXISTS"
# else
# echo " ❌ NOT FOUND"
# fi
echo ""
if [ -f target/x86_64-unknown-linux-gnu/release/vector-nextgen ] && \
[ -f target/aarch64-unknown-linux-gnu/release/vector-nextgen ] && \
[ -f target/armv7-unknown-linux-gnueabihf/release/vector-nextgen ]; then
[ -f target/aarch64-unknown-linux-gnu/release/vector-nextgen ]; then
# [ -f target/armv7-unknown-linux-gnueabihf/release/vector-nextgen ]; then
echo "✅ All nextgen binaries exist - Makefile should skip rebuild"
else
echo "⚠️ Some binaries missing - Makefile will trigger rebuild"
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,15 @@ cargo-install-%:
.PHONY: release-docker
release-docker: target/x86_64-unknown-linux-gnu/release/vector
release-docker: target/aarch64-unknown-linux-gnu/release/vector
release-docker: target/armv7-unknown-linux-gnueabihf/release/vector
#release-docker: target/armv7-unknown-linux-gnueabihf/release/vector
@echo "Releasing docker image..."
@scripts/release-docker.sh
@echo "Done releasing docker image."

.PHONY: release-docker-nextgen
release-docker-nextgen: target/x86_64-unknown-linux-gnu/release/vector-nextgen
release-docker-nextgen: target/aarch64-unknown-linux-gnu/release/vector-nextgen
release-docker-nextgen: target/armv7-unknown-linux-gnueabihf/release/vector-nextgen
# release-docker-nextgen: target/armv7-unknown-linux-gnueabihf/release/vector-nextgen
@echo "Releasing docker image (nextgen mode)..."
@NEXTGEN=true scripts/release-docker.sh
@echo "Done releasing docker image (nextgen mode)."
Expand Down
6 changes: 1 addition & 5 deletions proto/tidb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ message TopRURecord {
repeated TopRURecordItem items = 5;
}

message ReportTopRURecords {
repeated TopRURecord records = 1;
}

// TopRURecordItem represents statistics within a single time bucket.
message TopRURecordItem {
uint64 timestamp_sec = 1; // timestamp in second
Expand Down Expand Up @@ -117,6 +113,6 @@ message TopSQLSubResponse {
TopSQLRecord record = 1;
SQLMeta sql_meta = 2;
PlanMeta plan_meta = 3;
ReportTopRURecords top_ru_records = 4;
TopRURecord ru_record = 4;
}
}
6 changes: 3 additions & 3 deletions scripts/release-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ BINARY_NAME="${NEXTGEN:+vector-nextgen}"
BINARY_NAME="${BINARY_NAME:-vector}"
cp target/x86_64-unknown-linux-gnu/release/${BINARY_NAME} "$WORK_DIR"/vector-amd64
cp target/aarch64-unknown-linux-gnu/release/${BINARY_NAME} "$WORK_DIR"/vector-arm64
cp target/armv7-unknown-linux-gnueabihf/release/${BINARY_NAME} "$WORK_DIR"/vector-arm
# cp target/armv7-unknown-linux-gnueabihf/release/${BINARY_NAME} "$WORK_DIR"/vector-arm
# cp config/vector.toml "$WORK_DIR"

VERSION="${VECTOR_VERSION:-"$(scripts/version.sh)"}"
Expand All @@ -45,7 +45,7 @@ BASE=debian
TAG="${TAG:-$REPO:$VERSION-$BASE}"
DOCKERFILE="scripts/docker/Dockerfile"

PLATFORMS="linux/amd64,linux/arm64,linux/arm/v7"
#PLATFORMS="linux/amd64,linux/arm64"
#PLATFORMS="linux/amd64,linux/arm64,linux/arm/v7"
PLATFORMS="linux/amd64,linux/arm64"
echo "Building docker image: $TAG for $PLATFORMS"
docker buildx build --push --platform="$PLATFORMS" -t "$TAG" -f "$DOCKERFILE" "$WORK_DIR"
4 changes: 2 additions & 2 deletions src/sinks/topsql_data_deltalake/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,12 @@ impl TopSQLDeltaLakeSink {
}
};

let type_dir = format!("type=topsql_{}", table_type);
let type_dir = format!("component={}", table_type);
let instance_dir = format!("instance={}", table_instance);

let table_path = if self.base_path.to_string_lossy().starts_with("s3://") {
// For S3 paths, build a partition-like directory structure
// <base>/topsql/data/type=.../instance=.../<table_name>
// <base>/topsql/data/component=.../instance=.../<table_name>
let base = self.base_path.to_string_lossy();
let base = base.trim_end_matches('/');
PathBuf::from(format!(
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/topsql_meta_deltalake/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,13 @@ impl TopSQLDeltaLakeSink {
let table_path = if self.base_path.to_string_lossy().starts_with("s3://") {
// For S3 paths, append the table name to the S3 path
PathBuf::from(format!(
"{}/type={}",
"{}/component={}",
self.base_path.to_string_lossy(),
table_name
))
} else {
// For local paths, use join as before
self.base_path.join(format!("type={}", table_name))
self.base_path.join(format!("component={}", table_name))
};

let table_config = self
Expand Down
107 changes: 81 additions & 26 deletions src/sources/conprof/tools/jeprof_native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,16 @@ fn address_sub_one(hex_addr: &str) -> Option<String> {
Some(format!("{:0width$x}", r, width = ADDRESS_LENGTH))
}

/// Parse pprof heap profile text format and collect unique PCs (call sites).
/// Lines: optional % commands, then header "heap profile: ...", then
/// "\s*(\d+):\s*(\d+)\s*\[\s*(\d+):\s*(\d+)\]\s*@\s*(.*)" with addresses after @.
/// FixCallerAddresses: subtract 1 from each address except the first.
/// Parse pprof heap text format and collect unique PCs (call sites).
///
/// Supports both:
/// 1) Heap profile entries:
/// "\s*(\d+):\s*(\d+)\s*\[\s*(\d+):\s*(\d+)\]\s*@\s*(addr1 addr2 ...)"
/// 2) Remote threaded heap v2 ("heap_v2/<rate>"):
/// - first section header: "heap_v2/<rate>"
/// - stack is provided on separate lines starting with "@ addr1 addr2 ..."
///
/// FixCallerAddresses (jeprof): subtract 1 from each address except the first.
/// Returns sorted unique PCs as 0-padded hex strings (no 0x prefix, for consistent ordering).
fn parse_heap_profile_for_pcs(body: &[u8]) -> Option<Vec<String>> {
let text = str::from_utf8(body).ok()?;
Expand All @@ -60,33 +66,63 @@ fn parse_heap_profile_for_pcs(body: &[u8]) -> Option<Vec<String>> {
continue;
}
if !past_header {
if line.starts_with("heap profile:") || line.starts_with("heap ") {
if line.starts_with("heap profile:")
|| line.starts_with("heap ")
|| line.starts_with("heap_v2/")
{
past_header = true;
}
continue;
}
if line.starts_with("MAPPED_LIBRARIES:") || line.starts_with("--- Memory map:") {
break;
}
// Match: optional whitespace, count1: bytes1 [ count2: bytes2 ] @ addr1 addr2 ...

let rest = line.trim_start();
let at_pos = rest.find(" @ ")?;
let stack_part = rest.get(at_pos + 3..)?.trim();
if stack_part.is_empty() {
continue;
}
let addrs: Vec<&str> = stack_part.split_whitespace().collect();
if addrs.is_empty() {

// heap_v2 threaded format:
// "@ addr1 addr2 ..." (stack for the following t*: lines)
if rest.starts_with('@') {
let stack_part = rest.trim_start_matches('@').trim();
if stack_part.is_empty() {
continue;
}
let addrs: Vec<&str> = stack_part.split_whitespace().collect();
if addrs.is_empty() {
continue;
}
for (i, addr) in addrs.iter().enumerate() {
let extended = hex_extend(addr)?;
let fixed = if i == 0 {
extended
} else {
address_sub_one(&extended).unwrap_or(extended)
};
pcs.insert(fixed);
}
continue;
}
for (i, addr) in addrs.iter().enumerate() {
let extended = hex_extend(addr)?;
let fixed = if i == 0 {
extended
} else {
address_sub_one(&extended).unwrap_or(extended)
};
pcs.insert(fixed);

// heap profile entry format:
// optional whitespace, count1: bytes1 [ count2: bytes2 ] @ addr1 addr2 ...
if let Some(at_pos) = rest.find(" @ ") {
let stack_part = rest.get(at_pos + 3..)?.trim();
if stack_part.is_empty() {
continue;
}
let addrs: Vec<&str> = stack_part.split_whitespace().collect();
if addrs.is_empty() {
continue;
}
for (i, addr) in addrs.iter().enumerate() {
let extended = hex_extend(addr)?;
let fixed = if i == 0 {
extended
} else {
address_sub_one(&extended).unwrap_or(extended)
};
pcs.insert(fixed);
}
}
}

Expand Down Expand Up @@ -187,14 +223,16 @@ fn build_symbolized_output(
out.extend_from_slice(program_name.as_bytes());
out.push(b'\n');
for pc in pcs {
let sym = symbol_map
.get(pc)
.map(|s| s.as_str())
.unwrap_or("0x");
out.extend_from_slice(b"0x");
out.extend_from_slice(pc.as_bytes());
out.push(b' ');
out.extend_from_slice(sym.as_bytes());
if let Some(s) = symbol_map.get(pc) {
out.extend_from_slice(s.as_bytes());
} else {
// Match Perl: when symbol is missing, use address as the symbol (0x<pc>)
out.extend_from_slice(b"0x");
out.extend_from_slice(pc.as_bytes());
}
out.push(b'\n');
}
out.extend_from_slice(b"---\n");
Expand All @@ -205,12 +243,15 @@ fn build_symbolized_output(

/// Full native jeprof --raw flow: GET heap -> parse PCs -> fetch symbols + cmdline -> build output.
/// If profile is binary or parsing yields no PCs, returns raw body only (no symbol header).
/// Requesting Accept: text/plain ensures the server returns pprof text format (e.g. "heap profile: ... @ 0x...")
/// so we can parse PCs and add the symbol header; otherwise some servers return jemalloc heap_v2 and we skip symbolization.
pub async fn fetch_raw_symbolized(
client: &Client,
heap_url: &str,
) -> Result<Vec<u8>, String> {
let body = client
.get(heap_url)
.header("Accept", "text/plain")
.send()
.await
.map_err(|e| format!("http request failed: {}", e))?;
Expand Down Expand Up @@ -275,6 +316,20 @@ mod tests {
assert!(pcs.iter().any(|s| s.contains("12345") || s.ends_with("12345")));
}

#[test]
fn test_parse_heap_v2_at_lines() {
let body = b"heap_v2/524288
t*: 1: 2 [ 0: 0]
@ 0x12345 0x67890 0xabc
";
let pcs = parse_heap_profile_for_pcs(body).unwrap();
assert!(pcs.len() >= 2);
// First addr is not FixCallerAddresses-adjusted.
assert!(pcs.iter().any(|s| s.ends_with("12345")));
// Second+ are adjusted (subtract 1), so 0x67890 -> 0x6788f
assert!(pcs.iter().any(|s| s.ends_with("6788f")));
}

#[test]
fn test_base_url_from_heap_url() {
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion src/sources/topsql/upstream/tidb/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl UpstreamEventParser for TopSqlSubResponseParser {
}
Some(RespOneof::SqlMeta(sql_meta)) => Self::parse_tidb_sql_meta(sql_meta),
Some(RespOneof::PlanMeta(plan_meta)) => Self::parse_tidb_plan_meta(plan_meta),
Some(RespOneof::TopRuRecords(_)) => vec![], // TODO: implement TopRURecords parsing
Some(RespOneof::RuRecord(_)) => vec![], // TODO: implement TopRU record parsing
None => vec![],
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/sources/topsql/upstream/tidb/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ impl ByteSizeOf for RespOneof {
RespOneof::PlanMeta(plan_meta) => {
plan_meta.plan_digest.len() + plan_meta.normalized_plan.len()
}
RespOneof::TopRuRecords(top_ru_records) => {
top_ru_records.records.size_of()
}
RespOneof::RuRecord(ru_record) => ru_record.size_of(),
}
}
}
Expand Down
Loading
Loading