Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ python -m telemetry_window_demo.cli run --config configs/richer_sample.yaml

That scenario pack reads `data/raw/richer_sample_events.jsonl` and writes outputs to `data/processed/richer_sample/`.
It currently produces `28` normalized events, `24` windows, and `8` alerts.
Both sample paths also emit a compact `summary.json` alongside the CSV and PNG outputs.

## Current behavior

Expand Down Expand Up @@ -64,13 +65,16 @@ The richer scenario pack uses a longer `120` second cooldown so the output stays

## Outputs

Running the default command regenerates:

- `data/processed/features.csv`
- `data/processed/alerts.csv`
- `data/processed/event_count_timeline.png`
- `data/processed/error_rate_timeline.png`
- `data/processed/alerts_timeline.png`
Running the default command regenerates:

- `data/processed/features.csv`
- `data/processed/alerts.csv`
- `data/processed/summary.json`
- `data/processed/event_count_timeline.png`
- `data/processed/error_rate_timeline.png`
- `data/processed/alerts_timeline.png`

The summary artifact includes the input path, output directory, normalized event count, window count, feature row count, alert count, triggered rule names and counts, cooldown setting, and generated artifact paths.

## Scope

Expand Down
33 changes: 33 additions & 0 deletions data/processed/richer_sample/summary.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"input_path": "data/raw/richer_sample_events.jsonl",
"output_dir": "data/processed/richer_sample",
"normalized_event_count": 28,
"window_count": 24,
"feature_row_count": 24,
"alert_count": 8,
"triggered_rule_names": [
"high_error_rate",
"high_severity_spike",
"login_fail_burst",
"persistent_high_error",
"rare_event_repeat_malware_alert",
"rare_event_repeat_policy_denied"
],
"triggered_rule_counts": {
"high_error_rate": 2,
"high_severity_spike": 1,
"login_fail_burst": 1,
"persistent_high_error": 2,
"rare_event_repeat_malware_alert": 1,
"rare_event_repeat_policy_denied": 1
},
"cooldown_seconds": 120,
"generated_artifacts": [
"data/processed/richer_sample/features.csv",
"data/processed/richer_sample/alerts.csv",
"data/processed/richer_sample/summary.json",
"data/processed/richer_sample/event_count_timeline.png",
"data/processed/richer_sample/error_rate_timeline.png",
"data/processed/richer_sample/alerts_timeline.png"
]
}
33 changes: 33 additions & 0 deletions data/processed/summary.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"input_path": "data/raw/sample_events.jsonl",
"output_dir": "data/processed",
"normalized_event_count": 41,
"window_count": 24,
"feature_row_count": 24,
"alert_count": 12,
"triggered_rule_names": [
"high_error_rate",
"high_severity_spike",
"login_fail_burst",
"persistent_high_error",
"rare_event_repeat_malware_alert",
"source_spread_spike"
],
"triggered_rule_counts": {
"high_error_rate": 3,
"high_severity_spike": 2,
"login_fail_burst": 2,
"persistent_high_error": 3,
"rare_event_repeat_malware_alert": 1,
"source_spread_spike": 1
},
"cooldown_seconds": 60,
"generated_artifacts": [
"data/processed/features.csv",
"data/processed/alerts.csv",
"data/processed/summary.json",
"data/processed/event_count_timeline.png",
"data/processed/error_rate_timeline.png",
"data/processed/alerts_timeline.png"
]
}
264 changes: 162 additions & 102 deletions src/telemetry_window_demo/cli.py
Original file line number Diff line number Diff line change
@@ -1,128 +1,146 @@
from __future__ import annotations

import argparse
from pathlib import Path
from typing import Any

from .features import compute_window_features
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Any
from .features import compute_window_features
from .io import (
format_timestamp,
load_alert_table,
load_config,
load_events,
load_feature_table,
resolve_config_path,
write_json,
write_table,
)
from .preprocess import normalize_events
from .rules import apply_rules
from .visualize import plot_outputs
from .windowing import build_windows


def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.func(args)


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="telemetry-window-demo",
description="Windowed telemetry analytics on timestamped event streams.",
)
subparsers = parser.add_subparsers(dest="command", required=True)

run_parser = subparsers.add_parser("run", help="Run the full telemetry pipeline.")
run_parser.add_argument("--config", required=True, help="Path to a YAML config file.")
run_parser.set_defaults(func=run_command)

summarize_parser = subparsers.add_parser(
"summarize",
help="Summarize an input event file.",
)
summarize_parser.add_argument("--input", required=True, help="Path to .jsonl or .csv.")
summarize_parser.set_defaults(func=summarize_command)

plot_parser = subparsers.add_parser("plot", help="Render plots from CSV outputs.")
plot_parser.add_argument("--features", required=True, help="Path to features.csv.")
plot_parser.add_argument("--alerts", help="Path to alerts.csv.")
plot_parser.add_argument(
"--output-dir",
default="data/processed",
help="Directory where plot images will be written.",
)
plot_parser.set_defaults(func=plot_command)

return parser


from .preprocess import normalize_events
from .rules import apply_rules
from .visualize import plot_outputs
from .windowing import build_windows
def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.func(args)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="telemetry-window-demo",
description="Windowed telemetry analytics on timestamped event streams.",
)
subparsers = parser.add_subparsers(dest="command", required=True)
run_parser = subparsers.add_parser("run", help="Run the full telemetry pipeline.")
run_parser.add_argument("--config", required=True, help="Path to a YAML config file.")
run_parser.set_defaults(func=run_command)
summarize_parser = subparsers.add_parser(
"summarize",
help="Summarize an input event file.",
)
summarize_parser.add_argument("--input", required=True, help="Path to .jsonl or .csv.")
summarize_parser.set_defaults(func=summarize_command)
plot_parser = subparsers.add_parser("plot", help="Render plots from CSV outputs.")
plot_parser.add_argument("--features", required=True, help="Path to features.csv.")
plot_parser.add_argument("--alerts", help="Path to alerts.csv.")
plot_parser.add_argument(
"--output-dir",
default="data/processed",
help="Directory where plot images will be written.",
)
plot_parser.set_defaults(func=plot_command)
return parser
def run_command(args: argparse.Namespace) -> None:
config_path = Path(args.config).resolve()
config = load_config(config_path)
time_config = config.get("time", {})
feature_config = config.get("features", {})
rules_config = config.get("rules") or {}
input_path = resolve_config_path(config_path, config["input_path"])
output_dir = resolve_config_path(config_path, config.get("output_dir", "data/processed"))

events = load_events(input_path)
normalized = normalize_events(
events,
timestamp_col=time_config.get("timestamp_col", "timestamp"),
error_statuses=feature_config.get("error_statuses"),
high_severity_levels=feature_config.get("severity_levels"),
)
windows = build_windows(
normalized,
timestamp_col=time_config.get("timestamp_col", "timestamp"),
window_size_seconds=int(time_config.get("window_size_seconds", 60)),
step_size_seconds=int(time_config.get("step_size_seconds", 10)),
)
events = load_events(input_path)
normalized = normalize_events(
events,
timestamp_col=time_config.get("timestamp_col", "timestamp"),
error_statuses=feature_config.get("error_statuses"),
high_severity_levels=feature_config.get("severity_levels"),
)
windows = build_windows(
normalized,
timestamp_col=time_config.get("timestamp_col", "timestamp"),
window_size_seconds=int(time_config.get("window_size_seconds", 60)),
step_size_seconds=int(time_config.get("step_size_seconds", 10)),
)
features = compute_window_features(
normalized,
windows,
count_event_types=feature_config.get("count_event_types"),
)
alerts = apply_rules(features, config.get("rules"))
alerts = apply_rules(features, rules_config)
cooldown_seconds = int(rules_config.get("cooldown_seconds", 0))

feature_path = write_table(features, output_dir / "features.csv")
alert_path = write_table(alerts, output_dir / "alerts.csv")
plot_paths = plot_outputs(features, alerts, output_dir)

print(f"[OK] Loaded {len(normalized)} events")
print(f"[OK] Generated {len(features)} windows")
print(f"[OK] Computed {max(len(features.columns) - 2, 0)} features per window")
print(f"[OK] Triggered {len(alerts)} alerts")
print(f"[OK] Saved {feature_path.name}, {alert_path.name}")
print(f"[OK] Saved plots to {_display_path(output_dir)}")
for plot_path in plot_paths:
print(f" - {plot_path.name}")


def summarize_command(args: argparse.Namespace) -> None:
events = normalize_events(load_events(args.input))
min_time = format_timestamp(events["timestamp"].min())
max_time = format_timestamp(events["timestamp"].max())
top_event_types = events["event_type"].value_counts().head(5).to_dict()
overall_error_rate = float(events["is_error"].mean()) if not events.empty else 0.0

print(f"events: {len(events)}")
print(f"time_range: {min_time} -> {max_time}")
print(f"unique_sources: {events['source'].nunique()}")
print(f"unique_targets: {events['target'].nunique()}")
print(f"overall_error_rate: {overall_error_rate:.2f}")
print(f"top_event_types: {top_event_types}")


def plot_command(args: argparse.Namespace) -> None:
features = load_feature_table(args.features)
alerts = load_alert_table(args.alerts) if args.alerts else load_alert_table(Path(args.features).with_name("alerts.csv"))
plot_paths = plot_outputs(features, alerts, args.output_dir)
print(f"[OK] Saved plots to {_display_path(Path(args.output_dir).resolve())}")
for plot_path in plot_paths:
print(f" - {plot_path.name}")


summary_path = output_dir / "summary.json"
summary = _build_run_summary(
input_path=input_path,
output_dir=output_dir,
normalized=normalized,
windows=windows,
features=features,
alerts=alerts,
cooldown_seconds=cooldown_seconds,
feature_path=feature_path,
alert_path=alert_path,
summary_path=summary_path,
plot_paths=plot_paths,
)
write_json(summary, summary_path)

print(f"[OK] Loaded {len(normalized)} events")
print(f"[OK] Generated {len(features)} windows")
print(f"[OK] Computed {max(len(features.columns) - 2, 0)} features per window")
print(f"[OK] Triggered {len(alerts)} alerts")
print(f"[OK] Saved {feature_path.name}, {alert_path.name}")
print(f"[OK] Saved plots to {_display_path(output_dir)}")
for plot_path in plot_paths:
print(f" - {plot_path.name}")


def summarize_command(args: argparse.Namespace) -> None:
events = normalize_events(load_events(args.input))
min_time = format_timestamp(events["timestamp"].min())
max_time = format_timestamp(events["timestamp"].max())
top_event_types = events["event_type"].value_counts().head(5).to_dict()
overall_error_rate = float(events["is_error"].mean()) if not events.empty else 0.0

print(f"events: {len(events)}")
print(f"time_range: {min_time} -> {max_time}")
print(f"unique_sources: {events['source'].nunique()}")
print(f"unique_targets: {events['target'].nunique()}")
print(f"overall_error_rate: {overall_error_rate:.2f}")
print(f"top_event_types: {top_event_types}")


def plot_command(args: argparse.Namespace) -> None:
features = load_feature_table(args.features)
alerts = load_alert_table(args.alerts) if args.alerts else load_alert_table(Path(args.features).with_name("alerts.csv"))
plot_paths = plot_outputs(features, alerts, args.output_dir)
print(f"[OK] Saved plots to {_display_path(Path(args.output_dir).resolve())}")
for plot_path in plot_paths:
print(f" - {plot_path.name}")


def _display_path(path: Path) -> str:
cwd = Path.cwd().resolve()
resolved = path.resolve()
Expand All @@ -132,5 +150,47 @@ def _display_path(path: Path) -> str:
return resolved.as_posix()


if __name__ == "__main__":
main()
def _build_run_summary(
input_path: Path,
output_dir: Path,
normalized: Any,
windows: list[Any],
features: Any,
alerts: Any,
cooldown_seconds: int,
feature_path: Path,
alert_path: Path,
summary_path: Path,
plot_paths: list[Path],
) -> dict[str, object]:
if alerts.empty:
rule_counts: dict[str, int] = {}
else:
rule_counts = {
str(rule_name): int(count)
for rule_name, count in alerts["rule_name"].value_counts().sort_index().items()
}

artifact_paths = [
feature_path,
alert_path,
summary_path,
*plot_paths,
]

return {
"input_path": _display_path(input_path),
"output_dir": _display_path(output_dir),
"normalized_event_count": int(len(normalized)),
"window_count": int(len(windows)),
"feature_row_count": int(len(features)),
"alert_count": int(len(alerts)),
"triggered_rule_names": sorted(rule_counts),
"triggered_rule_counts": rule_counts,
"cooldown_seconds": int(cooldown_seconds),
"generated_artifacts": [_display_path(path) for path in artifact_paths],
}


if __name__ == "__main__":
main()
Loading
Loading