Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.jdt.ls.vmargs": "-XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -Dsun.zip.disableMemoryMapping=true -Xmx8G -Xms100m -Xlog:disable"
}
54 changes: 54 additions & 0 deletions audit-server/audit-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.ranger</groupId>
<artifactId>ranger</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>

<artifactId>ranger-audit-server-common</artifactId>
<packaging>jar</packaging>
<name>Ranger Audit Server Common</name>
<description>Shared classes between audit ingestor and dispatcher</description>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ranger</groupId>
<artifactId>ranger-plugins-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>

<build>
<finalName>ranger-audit-server-common-${project.version}</finalName>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,27 @@

package org.apache.ranger.audit.server;

import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Properties;

/**
* Base configuration class for Ranger Audit Server services.
* Can be extended by specific services to load their custom configuration files.
*/
public class AuditConfig extends RangerConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(AuditConfig.class);
private static volatile AuditConfig sInstance;
public class AuditConfig extends Configuration {
private static final Logger LOG = LoggerFactory.getLogger(AuditConfig.class);

protected AuditConfig() {
public AuditConfig() {
super();
}

/**
* Get the singleton instance of AuditConfig.
* Subclasses should override this method to return their specific instance.
*/
public static AuditConfig getInstance() {
AuditConfig ret = AuditConfig.sInstance;

if (ret == null) {
synchronized (AuditConfig.class) {
ret = AuditConfig.sInstance;

if (ret == null) {
ret = new AuditConfig();
AuditConfig.sInstance = ret;
}
}
}

return ret;
}

public Properties getProperties() {
return this.getProps();
}
Expand All @@ -66,11 +48,11 @@ public Properties getProperties() {
* Add a resource file to the configuration.
* Subclasses can override to load their specific config files.
*
* @param resourcePath Path to the resource file (e.g., "conf/ranger-audit-server-site.xml")
* @param resourcePath Path to the resource file (e.g., "conf/ranger-audit-ingestor-site.xml")
* @param required Whether this resource is required
* @return true if resource was loaded successfully or is optional, false otherwise
*/
protected boolean addAuditResource(String resourcePath, boolean required) {
public boolean addAuditResource(String resourcePath, boolean required) {
LOG.debug("==> addAuditResource(path={}, required={})", resourcePath, required);

boolean success = addResourceIfReadable(resourcePath);
Expand All @@ -87,4 +69,58 @@ protected boolean addAuditResource(String resourcePath, boolean required) {

return success || !required;
}

public boolean addResourceIfReadable(String aResourceName) {
LOG.debug("==> addResourceIfReadable({})", aResourceName);

boolean ret = false;
URL fUrl = getFileLocation(aResourceName);

if (fUrl != null) {
LOG.debug("addResourceIfReadable({}): resource file is {}", aResourceName, fUrl);

try {
addResource(fUrl);

ret = true;
} catch (Exception e) {
LOG.error("Unable to load the resource name [{}]. Ignoring the resource:{}", aResourceName, fUrl);

LOG.debug("Resource loading failed for {}", fUrl, e);
}
} else {
LOG.debug("addResourceIfReadable({}): couldn't find resource file location", aResourceName);
}

LOG.debug("<== addResourceIfReadable({}), result={}", aResourceName, ret);

return ret;
}

public URL getFileLocation(String fileName) {
URL lurl = null;

if (!StringUtils.isEmpty(fileName)) {
lurl = AuditConfig.class.getClassLoader().getResource(fileName);

if (lurl == null) {
lurl = AuditConfig.class.getClassLoader().getResource("/" + fileName);
}

if (lurl == null) {
File f = new File(fileName);
if (f.exists()) {
try {
lurl = f.toURI().toURL();
} catch (MalformedURLException e) {
LOG.error("Unable to load the resource name [{}]. Ignoring the resource:{}", fileName, f.getPath());
}
} else {
LOG.debug("Conf file path {} does not exists", fileName);
}
}
}

return lurl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,41 @@ public class AuditServerConstants {
private AuditServerConstants() {}

public static final String AUDIT_SERVER_APPNAME = "ranger-audit";
public static final String PROP_PREFIX_AUDIT_SERVER = "ranger.audit.server.";
public static final String PROP_PREFIX_AUDIT_SERVER = "ranger.audit.ingestor.";
public static final String PROP_PREFIX_AUDIT_SERVER_SERVICE = PROP_PREFIX_AUDIT_SERVER + "service.";
public static final String PROP_AUTH_TO_LOCAL = PROP_PREFIX_AUDIT_SERVER + "auth.to.local";
public static final String PROP_SUFFIX_ALLOWED_USERS = ".allowed.users";

public static final String JAAS_KRB5_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
public static final String JAAS_USE_KEYTAB = "useKeyTab=true";
public static final String JAAS_KEYTAB = "keyTab=\"";
public static final String JAAS_STOKE_KEY = "storeKey=true";
public static final String JAAS_SERVICE_NAME = "serviceName=kafka";
public static final String JAAS_USER_TICKET_CACHE = "useTicketCache=false";
public static final String JAAS_PRINCIPAL = "principal=\"";
public static final String PROP_BOOTSTRAP_SERVERS = "bootstrap.servers";
public static final String PROP_TOPIC_NAME = "topic.name";
public static final String PROP_SECURITY_PROTOCOL = "security.protocol";
public static final String PROP_SASL_MECHANISM = "sasl.mechanism";
public static final String PROP_SASL_JAAS_CONFIG = "sasl.jaas.config";
public static final String PROP_SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
public static final String PROP_REQ_TIMEOUT_MS = "request.timeout.ms";
public static final String PROP_CONN_MAX_IDEAL_MS = "connections.max.idle.ms";
public static final String PROP_SOLR_DEST_PREFIX = "solr";
public static final String PROP_HDFS_DEST_PREFIX = "hdfs";
public static final String PROP_CONSUMER_THREAD_COUNT = "consumer.thread.count";
public static final String PROP_CONSUMER_OFFSET_COMMIT_STRATEGY = "consumer.offset.commit.strategy";
public static final String PROP_CONSUMER_OFFSET_COMMIT_INTERVAL = "consumer.offset.commit.interval.ms";
public static final String PROP_CONSUMER_MAX_POLL_RECORDS = "consumer.max.poll.records";
public static final String PROP_CONSUMER_SESSION_TIMEOUT_MS = "consumer.session.timeout.ms";
public static final String PROP_CONSUMER_MAX_POLL_INTERVAL_MS = "consumer.max.poll.interval.ms";
public static final String PROP_CONSUMER_HEARTBEAT_INTERVAL_MS = "consumer.heartbeat.interval.ms";
public static final String PROP_CONSUMER_PARTITION_ASSIGNMENT_STRATEGY = "consumer.partition.assignment.strategy";
public static final String PROP_AUDIT_SERVICE_PRINCIPAL = "kerberos.principal";
public static final String PROP_AUDIT_SERVICE_KEYTAB = "kerberos.keytab";
public static final String PROP_KAFKA_PROP_PREFIX = "xasecure.audit.destination.kafka";
public static final String PROP_KAFKA_STARTUP_MAX_RETRIES = "kafka.startup.max.retries";
public static final String PROP_KAFKA_STARTUP_RETRY_DELAY_MS = "kafka.startup.retry.delay.ms";
public static final String JAAS_KRB5_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
public static final String JAAS_USE_KEYTAB = "useKeyTab=true";
public static final String JAAS_KEYTAB = "keyTab=\"";
public static final String JAAS_STOKE_KEY = "storeKey=true";
public static final String JAAS_SERVICE_NAME = "serviceName=kafka";
public static final String JAAS_USER_TICKET_CACHE = "useTicketCache=false";
public static final String JAAS_PRINCIPAL = "principal=\"";
public static final String PROP_BOOTSTRAP_SERVERS = "bootstrap.servers";
public static final String PROP_TOPIC_NAME = "topic.name";
public static final String PROP_SECURITY_PROTOCOL = "security.protocol";
public static final String PROP_SASL_MECHANISM = "sasl.mechanism";
public static final String PROP_SASL_JAAS_CONFIG = "sasl.jaas.config";
public static final String PROP_SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
public static final String PROP_REQ_TIMEOUT_MS = "request.timeout.ms";
public static final String PROP_CONN_MAX_IDEAL_MS = "connections.max.idle.ms";
public static final String PROP_SOLR_DEST_PREFIX = "solr";
public static final String PROP_HDFS_DEST_PREFIX = "hdfs";
public static final String PROP_DISPATCHER_THREAD_COUNT = "dispatcher.thread.count";
public static final String PROP_DISPATCHER_OFFSET_COMMIT_STRATEGY = "dispatcher.offset.commit.strategy";
public static final String PROP_DISPATCHER_OFFSET_COMMIT_INTERVAL = "dispatcher.offset.commit.interval.ms";
public static final String PROP_DISPATCHER_MAX_POLL_RECORDS = "dispatcher.max.poll.records";
public static final String PROP_DISPATCHER_SESSION_TIMEOUT_MS = "dispatcher.session.timeout.ms";
public static final String PROP_DISPATCHER_MAX_POLL_INTERVAL_MS = "dispatcher.max.poll.interval.ms";
public static final String PROP_DISPATCHER_HEARTBEAT_INTERVAL_MS = "dispatcher.heartbeat.interval.ms";
public static final String PROP_DISPATCHER_PARTITION_ASSIGNMENT_STRATEGY = "dispatcher.partition.assignment.strategy";
public static final String PROP_AUDIT_SERVICE_PRINCIPAL = "kerberos.principal";
public static final String PROP_AUDIT_SERVICE_KEYTAB = "kerberos.keytab";
public static final String PROP_KAFKA_PROP_PREFIX = "xasecure.audit.destination.kafka";
public static final String PROP_KAFKA_TOPIC_INIT_MAX_RETRIES = "kafka.topic.init.max.retries";
public static final String PROP_KAFKA_TOPIC_INIT_RETRY_DELAY_MS = "kafka.topic.init.retry.delay.ms";

// ranger_audits topic configuration
public static final String PROP_TOPIC_PARTITIONS = "topic.partitions";
Expand All @@ -77,18 +77,18 @@ private AuditServerConstants() {}
public static final int DEFAULT_BUFFER_PARTITIONS = 9; // For plugin-based partitioning

// Hadoop security configuration for UGI
public static final String PROP_HADOOP_AUTHENTICATION_TYPE = "hadoop.security.authentication";
public static final String PROP_HADOOP_AUTH_TYPE_KERBEROS = "kerberos";
public static final String PROP_HADOOP_KERBEROS_NAME_RULES = "hadoop.security.auth_to_local";
public static final String PROP_HADOOP_AUTHENTICATION_TYPE = "hadoop.security.authentication";
public static final String PROP_HADOOP_AUTH_TYPE_KERBEROS = "kerberos";
public static final String PROP_HADOOP_KERBEROS_NAME_RULES = "hadoop.security.auth_to_local";

// Kafka Topic and defaults
public static final String DEFAULT_TOPIC = "ranger_audits";
public static final String DEFAULT_SASL_MECHANISM = "PLAIN";
public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
public static final String DEFAULT_SERVICE_NAME = "kafka";
public static final String DEFAULT_RANGER_AUDIT_HDFS_CONSUMER_GROUP = "ranger_audit_hdfs_consumer_group";
public static final String DEFAULT_RANGER_AUDIT_SOLR_CONSUMER_GROUP = "ranger_audit_solr_consumer_group";
public static final String PROP_SECURITY_PROTOCOL_VALUE = "SASL";
public static final String DEFAULT_TOPIC = "ranger_audits";
public static final String DEFAULT_SASL_MECHANISM = "PLAIN";
public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
public static final String DEFAULT_SERVICE_NAME = "kafka";
public static final String DEFAULT_RANGER_AUDIT_HDFS_DISPATCHER_GROUP = "ranger_audit_hdfs_dispatcher_group";
public static final String DEFAULT_RANGER_AUDIT_SOLR_DISPATCHER_GROUP = "ranger_audit_solr_dispatcher_group";
public static final String PROP_SECURITY_PROTOCOL_VALUE = "SASL";

// Offset commit strategies
public static final String PROP_OFFSET_COMMIT_STRATEGY_MANUAL = "manual";
Expand All @@ -97,18 +97,18 @@ private AuditServerConstants() {}
public static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 30000; // 30 seconds
public static final int DEFAULT_MAX_POLL_RECORDS = 500; // Kafka default batch size

// Kafka consumer rebalancing timeouts (for subscribe mode)
public static final int DEFAULT_SESSION_TIMEOUT_MS = 60000; // 60 seconds - failure detection
public static final int DEFAULT_MAX_POLL_INTERVAL_MS = 300000; // 5 minutes - max processing time
public static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 10000; // 10 seconds - heartbeat frequency
// Kafka dispatcher rebalancing timeouts (for subscribe mode)
public static final int DEFAULT_SESSION_TIMEOUT_MS = 60000; // 60 seconds - failure detection
public static final int DEFAULT_MAX_POLL_INTERVAL_MS = 300000; // 5 minutes - max processing time
public static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 10000; // 10 seconds - heartbeat frequency

// Kafka consumer partition assignment strategy
public static final String DEFAULT_PARTITION_ASSIGNMENT_STRATEGY = "org.apache.kafka.clients.consumer.RangeAssignor";
// Kafka dispatcher partition assignment strategy
public static final String DEFAULT_PARTITION_ASSIGNMENT_STRATEGY = "org.apache.kafka.clients.consumer.RangeAssignor";

// Destination
public static final String DESTINATION_HDFS = "HDFS";
public static final String DESTINATION_SOLR = "SOLR";
public static final String DESTINATION_HDFS = "HDFS";
public static final String DESTINATION_SOLR = "SOLR";

// Consumer Registry Configuration
public static final String PROP_CONSUMER_CLASSES = "consumer.classes";
// Dispatcher Registry Configuration
public static final String PROP_DISPATCHER_CLASSES = "dispatcher.classes";
}
Loading
Loading