Skip to content

feat(connectors): Implement influxdb v2 and v3 connector with separate source and sink creates#3140

Open
ryerraguntla wants to merge 37 commits intoapache:masterfrom
ryerraguntla:feat/influxdb_v2_v3_connector
Open

feat(connectors): Implement influxdb v2 and v3 connector with separate source and sink creates#3140
ryerraguntla wants to merge 37 commits intoapache:masterfrom
ryerraguntla:feat/influxdb_v2_v3_connector

Conversation

@ryerraguntla
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes # 3062

Rationale

This PR implements a unified InfluxDB connector supporting both InfluxDB V2 (Flux) and V3 (SQL) in a single crate per component (sink/source), eliminating code duplication while preserving full backward compatibility with existing V2 deployments.

Key Features:

  • Zero breaking changes for V2 users (backward-compatible config deserialization)
  • V3 stuck-timestamp detection with automatic batch inflation + circuit breaker
  • Performance improvements: SIMD JSON parsing (+40% in source), inlined hot paths (+3% in sink)
  • Enhanced safety: #[must_use] on critical functions, version-strict cursor validation
  • 95%+ test coverage maintained with 55+ new tests

What changed?

Architecture
Before (V2-only):
influxdb_sink/src/lib.rs (single flat config, 1,625 LOC)
influxdb_source/src/lib.rs (single flat config, 1,400 LOC)

After (V2 + V3)
influxdb_sink/src/
├── lib.rs (enum dispatch, 1,330 LOC)
└── protocol.rs (shared line-protocol escaping, 115 LOC)

influxdb_source/src/
├── lib.rs (enum dispatch, 817 LOC)
├── common.rs (shared config/validation, 815 LOC)
├── row.rs (CSV/JSONL parsing, 193 LOC)
├── v2.rs (Flux query logic, 374 LOC)
└── v3.rs (SQL query + stuck detection, 506 LOC)

Benefits:

Single .so per component (no InfluxClient trait overhead)
Zero code duplication (shared validation, escaping, retry logic)
Asymmetric structure (sink: 30-line diff; source: separate modules for V2/V3 query semantics)

For more details , please refer to the #3062 Comments section.
#3062 (comment)

Local Execution

  • Passed
  • Pre-commit hooks ran

AI Usage

AI Tools Used - Claude and Copilot

Scope of usage - Code review for quality and identifying performance issues. Generation of test cases, Documentation and summary notes generation.

Generated code is tested with actual test execution.

Can you explain every line of the code - yes

GaneshPatil7517 and others added 30 commits January 13, 2026 08:43
Implements Issue apache#2540 - Redshift Sink Connector with S3 staging support.

Features:
- S3 staging with automatic CSV file upload
- Redshift COPY command execution via PostgreSQL wire protocol
- IAM role authentication (recommended) or access key credentials
- Configurable batch size and compression (gzip, lzop, bzip2, zstd)
- Automatic table creation with customizable schema
- Retry logic with exponential backoff for transient failures
- Automatic cleanup of staged S3 files

Configuration options:
- connection_string: Redshift cluster connection URL
- target_table: Destination table name
- iam_role: IAM role ARN for S3 access (recommended)
- s3_bucket/s3_region/s3_prefix: S3 staging location
- batch_size: Messages per batch (default: 10000)
- compression: COPY compression format
- delete_staged_files: Auto-cleanup toggle (default: true)
- auto_create_table: Create table if missing (default: true)

Closes apache#2540
- Fix markdown lint issues in README.md (table formatting, blank lines, code fence language)
- Fix trailing newline in Cargo.toml
- Apply TOML formatting via taplo
- Add missing dependencies to DEPENDENCIES.md (rust-s3, rxml, rxml_validation, static_assertions)
- Add Redshift sink integration test using PostgreSQL (Redshift-compatible) and LocalStack for S3
- Add s3_endpoint config option to support custom endpoints (LocalStack, MinIO)
- Add path-style S3 access for custom endpoints
- Add localstack feature to testcontainers-modules
- Create test configuration files for Redshift connector
- Add s3_endpoint: None to test_config() in lib.rs (fixes E0063)
- Add endpoint parameter to S3Uploader tests in s3.rs
- Fix formatting for long line in init_s3_uploader()
- Add iggy_connector_redshift_sink to DEPENDENCIES.md
- Add maybe-async, md5, minidom to DEPENDENCIES.md
Critical fixes:
- Change Rust edition from 2024 to 2021 in Cargo.toml
- Fix S3 cleanup to happen regardless of COPY result (prevents orphaned files)

Moderate fixes:
- Remove zstd from valid compression options (not supported by Redshift)
- Update README to remove zstd from compression list
- Handle bucket creation error in integration tests with expect()
- Log JSON serialization errors instead of silent unwrap_or_default()

Performance:
- Cache escaped quote string to avoid repeated format! allocations

Windows compatibility (for local testing):
- Add #[cfg(unix)] conditionals for Unix-specific code in sender/mod.rs
Fixes clippy warning about unused 'runtime' field in test setup struct.
The runtime field is kept for future test expansion.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Changed CONFIG_ to PLUGIN_CONFIG_ for plugin configuration fields
- Changed TOPICS_0 to TOPICS with proper JSON array format
- Added CONSUMER_GROUP environment variable
…ort with latest S3 crate

Migrate S3 usage from rust-s3 to s3-tokio and update related dependencies. Top-level Cargo.toml updated (http, lz4_flex, toml) and DEPENDENCIES.md adjusted. redshift_sink/Cargo.toml switched to s3-tokio, made sqlx a workspace dependency and added rustls as a dev-dependency. Code changes: S3Uploader now owns Bucket (removed Box) and tests install the rustls crypto provider. Integration tests were refactored to remove the manual testcontainers setup in favor of the iggy_harness-based test harness.
Introduce a new core/connectors/influxdb_common crate that provides a version-abstraction layer for InfluxDB (InfluxDB V2 and V3). Adds the InfluxDbAdapter trait, ApiVersion factory, line-protocol escaping helpers, CSV/JSONL response parsers, and concrete V2/V3 adapters plus unit tests and architecture notes. Wire the new crate into the workspace (Cargo.toml/Cargo.lock) and update existing influxdb sink/source connector manifests/sources to depend on it. Also add integration test fixtures and v3-specific integration tests and configs to exercise V3 behavior.
Remove the external influxdb common adapter and refactor the sink to natively support both V2 and V3 configurations.

Key changes:
- Removed iggy_connector_influxdb_common dependency (Cargo.toml & Cargo.lock) and inlined adapter logic.
- Introduced InfluxDbSinkConfig enum with V2/V3 variants and helper methods (url, auth header, build_write_url/health_url, precision mapping, feature flags, etc.).
- Reworked InfluxDbSink struct: store unified config, auth_header, measurement/precision, metadata flags, batch size limit, and other derived fields.
- Added line-protocol escaping helpers (write_measurement, write_tag_value, write_field_string) and simplified PayloadFormat handling.
- Adjusted client initialization, connectivity checks, retry middleware setup, and improved error messages and transient vs permanent error handling.
- Updated Sink impl: open(), consume(), process/ batching, circuit breaker interactions, and close() behavior.
- Expanded and updated unit tests to cover v2/v3 config behavior, URL/precision mapping, escaping, and append_line error/success cases.
- Added new source modules and test script files related to InfluxDB connectors.

This refactor centralises version-specific behaviour, improves configurability, and prepares the connector for V3 line-protocol and auth differences.
Delete influx_dB_test_proc_docs/scripts/test-connectors.sh — an interactive Bash end-to-end test harness for InfluxDB v2/v3 connector scenarios (Iggy messaging, polling, and five connector tests). Removes helper functions, polling logic and all test cases bundled in the script.
Extract shared parsing and protocol logic into the influxdb_common crate and update sinks/sources to consume it. Introduces delegate! macros to remove repetitive variant matching, unifies URL/auth handling via InfluxDbAdapter (including V3 precision mapping), and centralises line-protocol escaping/row parsing. Optimises body construction (build_body) and Bytes usage, adds extensive unit & HTTP integration tests (axum dev-dep), and updates Cargo.toml entries accordingly to reflect the new shared dependency.
Ensure health_url trims a trailing '/' from the base URL in both V2 and V3 adapters to avoid double slashes when appending /health, and add tests verifying the behavior. Add tests that verify write_url percent-encodes bucket/org/db query parameters and that decoding recovers the original values. Improve CSV row parsing by preallocating Row with capacity based on active headers. Clean up influxdb_source Cargo.toml by removing unused csv and futures deps, add a comment explaining dashmap/once_cell are required due to macro expansion, and update the ignored list.
Refactor and harden InfluxDB connector common code: move Row type into row.rs and re-export it; make ApiVersion::from_config return Result and error on unknown values (avoid silent defaulting); make V3 precision mapping return Result and reject invalid precisions; validate sink precision early in open() to prevent silent timestamp mistakes. Add tab escaping to line-protocol writers and expand unit tests (empty inputs, tab escapes, unicode). Make CSV parser flexible for multi-table results and handle header updates. Strengthen RFC3339 cursor regex to reject out-of-range date parts. Improve test fixture container port handling to support IPv6 mappings and better error messages. Misc: minor visibility changes, JSONL format constant, Cargo description tweak, and additional tests to cover URL/health/build_query error cases.
Add validation and runtime fixes across InfluxDB connectors:

- Require timezone suffix for cursor/initial_offset timestamps to avoid UTC-vs-local ambiguity and update regex/tests accordingly.
- Validate V2 sink config to reject empty or whitespace-only orgs at open() to prevent runtime 400s.
- Validate initial_offset early in source open() and add tests for invalid/timezone-free offsets.
- Warn when a V2 Flux query lacks an explicit sort() because Skip-N dedup relies on stable ordering.
- In V3 source row processing, emit a warning when no row contains the cursor column and ensure messages are still emitted while max_cursor remains None; add tests.
- Simplify auth header and health URL construction (removed dynamic adapter usage for these paths).
- Ensure circuit breaker records successes for successful batches and move record_success into the per-batch success path; add a test to prevent tripping on intermittent failures.
- Change several atomic counter loads to SeqCst for correctness in tests and tighten an unreachable branch where precision is validated.
- Minor protocol.rs doc clarifications about tab escaping in line protocol.

Includes multiple unit/integration tests covering the new validations and circuit-breaker behavior.
core/connectors/influxdb_common: broaden CSV header detection to recognize any of `_time`, `_start`, or `_stop` so Flux window-aggregate results are parsed correctly; add tests covering _start/_stop-only headers and aggregation queries.

core/connectors/sinks/influxdb_sink: strengthen atomic orderings (use AcqRel for fetch_add and Acquire for loads) to ensure correct cross-thread visibility of counters; update tests to use Acquire loads.

core/connectors/sources/influxdb_source: derive Debug for RowProcessingResult and change process_rows to return an Err(Error::InvalidRecordValue) when no row contains the configured cursor field (instead of silently leaving max_cursor None). Update tests to expect the error — this prevents silent infinite re-delivery and surfaces misconfigured queries to the operator.
Delete the shared iggy_connector_influxdb_common crate and fold its functionality into the sink and source connectors. protocol.rs was moved/renamed into core/connectors/sinks/influxdb_sink/src/protocol.rs (helper functions made crate-private); row parsing was moved into core/connectors/sources/influxdb_source/src/row.rs and made crate-private. Adapter/config/v2/v3 logic was inlined into the respective sink/source code (URL builders, auth header generation, precision mapping, query builders, health URL checks), and relevant visibility and call sites were updated. Workspace Cargo.toml and Cargo.lock were updated to remove the member/dependency and to add CSV where needed; tests were adapted/added for the inlined helpers and validation behavior.
Implement backward-compatible deserialization for InfluxDB configs by adding custom Deserialize impls for InfluxDbSinkConfig and InfluxDbSourceConfig that default missing version to "v2" and reject unknown versions with a clear error. Add V3-specific options and safety checks: introduce include_metadata to omit the cursor field from emitted payloads, add QUERY_FORMAT_JSONL, and enforce MAX_STUCK_CAP_FACTOR (100) with validation on open to avoid extremely large queries. Make timestamp comparison conservative (return false on parse failure) to avoid skipping data. Switch message ID generation to per-message UUIDs (remove uuid_base usage), adjust payload building to filter cursor when include_metadata=false, and small sink fix to append lines without producing trailing newlines. Update and add tests covering config deserialization, timestamp behavior, stuck-cap validation, and other affected behaviors.
Various refactors and improvements to InfluxDB source/sink connectors:

- Make many config fields pub(crate) to improve encapsulation.
- Add toml as a dev-dependency for connectors and add default "version = \"v2\"" to example config.toml files.
- Introduce base_url() helpers to normalize URLs (strip trailing slashes) and use them when building endpoints; validate V2 org is non-empty in sink config.
- Introduce RowContext to consolidate per-poll parameters passed to row-processing routines; simplify signatures for process_rows and poll functions and propagate include_metadata consistently.
- Optimize per-message UUID generation by deriving IDs from a single per-poll base UUID to reduce PRNG calls.
- Add query_has_sort_call heuristic to detect Flux sort() calls (avoids false positives on identifier prefixes) and use it when checking V2 queries.
- Improve error messages for cursor_field validation to be version-specific and add related tests.
- Add comments clarifying escaping rules and rationale for using simd_json in the sink hot path.
- Update integration test TOML keys from api_version to version and add unit tests verifying TOML deserialization defaults and behavior.

These changes are focused on robustness, performance, and clearer configuration/validation behavior.
Remove the iggy_connector_redshift_sink crate (source, Cargo.toml, README) and its S3 helper; delete associated integration test configs and test files. Also remove the crate from the workspace members in Cargo.toml and unregister the redshift test module from the integration test harness to keep the test suite and workspace references consistent.
Use simd-json for JSONL parsing (clone lines into Vec<u8> for in-place parsing) and add simd-json to the source crate dependencies. Introduce DEFAULT_V2_CURSOR_FIELD and DEFAULT_V3_CURSOR_FIELD and make cursor_field handling/version validation version-strict with clearer, version-specific error messages and unit tests. Add #[must_use] annotations to precision mapping and URL builders to ensure errors are propagated, mark small writer helpers as #[inline], document the delegate! macro patterns, and tidy several docs/tests and small visibility changes (base_url -> pub(crate)).
Minor refactor and formatting changes across the InfluxDB source, v3 logic and integration fixtures, plus Cargo.lock dependency updates. Key changes:

- Reflow long argument lists and await expressions in influxdb_source lib and v3 for readability.
- Tweak validate_cursor_field error message construction and test unwrap_err formatting in common.rs.
- Use Option::is_none_or in query_has_sort_call and adjust related tests formatting.
- Simplify container port mapping in Elasticsearch and MongoDB fixtures: collapse intermediate `ports` variable into `mapped_port` and remove the IPv6 fallback mapping.
- Update Cargo.lock entries (add/remove/normalize some deps, e.g. axum, tokio, toml, simd-json; normalize lz4_flex entry and remove explicit package block).

These are mostly non-functional formatting and small API-use changes to improve code clarity and dependency resolution.
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 19, 2026

Codecov Report

❌ Patch coverage is 94.55141% with 133 lines in your changes missing coverage. Please review.
✅ Project coverage is 71.11%. Comparing base (2d6562b) to head (662f35c).

Files with missing lines Patch % Lines
core/connectors/sources/influxdb_source/src/v3.rs 95.23% 18 Missing and 18 partials ⚠️
core/connectors/sources/influxdb_source/src/v2.rs 95.19% 12 Missing and 19 partials ⚠️
core/connectors/sources/influxdb_source/src/lib.rs 91.37% 18 Missing and 7 partials ⚠️
...e/connectors/sources/influxdb_source/src/common.rs 95.78% 8 Missing and 10 partials ⚠️
core/connectors/sinks/quickwit_sink/src/lib.rs 42.85% 15 Missing and 1 partial ⚠️
core/connectors/sources/influxdb_source/src/row.rs 96.06% 4 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3140      +/-   ##
============================================
- Coverage     73.17%   71.11%   -2.06%     
  Complexity      943      943              
============================================
  Files          1123     1128       +5     
  Lines         97892    95461    -2431     
  Branches      75065    72653    -2412     
============================================
- Hits          71632    67887    -3745     
- Misses        23671    24734    +1063     
- Partials       2589     2840     +251     
Components Coverage Δ
Rust Core 71.33% <94.55%> (-2.72%) ⬇️
Java SDK 62.30% <ø> (ø)
C# SDK 69.10% <ø> (-0.31%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.53% <ø> (+0.22%) ⬆️
Go SDK 39.41% <ø> (ø)
Files with missing lines Coverage Δ
core/common/src/sender/mod.rs 90.47% <ø> (ø)
core/connectors/sinks/influxdb_sink/src/lib.rs 93.84% <ø> (-0.77%) ⬇️
...ore/connectors/sinks/influxdb_sink/src/protocol.rs 100.00% <100.00%> (ø)
core/connectors/sources/influxdb_source/src/row.rs 96.06% <96.06%> (ø)
core/connectors/sinks/quickwit_sink/src/lib.rs 60.00% <42.85%> (-7.82%) ⬇️
...e/connectors/sources/influxdb_source/src/common.rs 95.78% <95.78%> (ø)
core/connectors/sources/influxdb_source/src/lib.rs 92.83% <91.37%> (+0.87%) ⬆️
core/connectors/sources/influxdb_source/src/v2.rs 95.19% <95.19%> (ø)
core/connectors/sources/influxdb_source/src/v3.rs 95.23% <95.23%> (ø)

... and 113 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Remove a duplicate lz4_flex entry from DEPENDENCIES.md and apply minor editorial fixes: normalize the markdown table header in the InfluxDB v3 architecture doc, change “mis-parsing” to “misparsing” in the InfluxDB sink protocol comment, and adjust a test comment from “Unparseable” to “Unparsable”. These are non-functional, readability/cleanliness updates.
Add 'text' to the Markdown code fence to ensure correct rendering and adjust the table header separator spacing for consistent Markdown formatting. Purely formatting changes; no functional code changes.
Reformat spacing/alignment in core/integration/tests/connectors/influxdb/sink_v3.toml and source_v3.toml. This is a whitespace-only cleanup (standardizing 'key = "value"' spacing) and does not change any configuration values or semantics.
@ryerraguntla
Copy link
Copy Markdown
Contributor Author

@hubcio - Please review at your convinience

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants