Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_2;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HA_GROUP_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HDFS_URL_1;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HDFS_URL_2;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.POLICY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VERSION;
Expand All @@ -34,6 +36,7 @@

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
Expand Down Expand Up @@ -85,6 +88,7 @@ public class PhoenixHAAdminTool extends Configured implements Tool {
private static final String CMD_INITIATE_FAILOVER = "initiate-failover";
private static final String CMD_ABORT_FAILOVER = "abort-failover";
private static final String CMD_GET_CLUSTER_ROLE_RECORD = "get-cluster-role-record";
private static final String CMD_CREATE = "create";

// Common options
private static final Option HELP_OPT = new Option("h", "help", false, "Show help");
Expand Down Expand Up @@ -137,6 +141,33 @@ public class PhoenixHAAdminTool extends Configured implements Tool {
private static final Option TIMEOUT_OPT = new Option("t", "timeout", true,
"Timeout in seconds to wait for state transitions (default: 120)");

// Create command options (symmetric slot-based, no local/peer distinction)
// Cluster 1
private static final Option ZK_URL_1_OPT =
new Option("zk1", "zk-url-1", true, "ZK URL for cluster 1");

private static final Option CLUSTER_URL_1_OPT =
new Option("c1", "cluster-url-1", true, "HBase cluster URL for cluster 1");

private static final Option CLUSTER_ROLE_1_OPT =
new Option("cr1", "cluster-role-1", true, "Cluster role for cluster 1 (e.g., ACTIVE)");

private static final Option HDFS_URL_1_OPT =
new Option("hdfs1", "hdfs-url-1", true, "HDFS URL for cluster 1");

// Cluster 2
private static final Option ZK_URL_2_OPT =
new Option("zk2", "zk-url-2", true, "ZK URL for cluster 2");

private static final Option CLUSTER_URL_2_OPT =
new Option("c2", "cluster-url-2", true, "HBase cluster URL for cluster 2");

private static final Option CLUSTER_ROLE_2_OPT =
new Option("cr2", "cluster-role-2", true, "Cluster role for cluster 2 (e.g., STANDBY)");

private static final Option HDFS_URL_2_OPT =
new Option("hdfs2", "hdfs-url-2", true, "HDFS URL for cluster 2");

@Override
public int run(String[] args) throws Exception {
if (args.length == 0) {
Expand Down Expand Up @@ -177,6 +208,12 @@ public int run(String[] args) throws Exception {
// Retrieves and displays cluster role record for HA group
// Required: --ha-group
return executeGetClusterRoleRecord(commandArgs);
case CMD_CREATE:
// Creates a new HA group entry in SYSTEM.HA_GROUP (idempotent)
// Required: --ha-group, --policy, --zk-url-1, --cluster-url-1, --cluster-role-1,
// --zk-url-2, --cluster-url-2, --cluster-role-2
// Optional: --hdfs-url-1, --hdfs-url-2, --admin-version, --dry-run
return executeCreate(commandArgs);
default:
System.err.println("Unknown command: " + command);
printUsage();
Expand Down Expand Up @@ -780,6 +817,168 @@ private int executeGetClusterRoleRecord(String[] args) throws Exception {
}
}

/**
* Creates a new HA group entry in SYSTEM.HA_GROUP. Idempotent: if the group already exists,
* prints a skip message and returns success without modifying the existing row. The ZK znode is
* initialized automatically on first access by HAGroupStoreClient. Run the same command on both
* clusters.
*/
private int executeCreate(String[] args) throws Exception {
try {
CommandLine cmdLine = new DefaultParser().parse(createCreateOptions(), args);

if (cmdLine.hasOption(HELP_OPT.getOpt())) {
printCreateHelp();
return RET_SUCCESS;
}

String haGroupName = getRequiredOption(cmdLine, HA_GROUP_OPT, "HA group name");
String policy = getRequiredOption(cmdLine, POLICY_OPT, "policy");
String zkUrl1 = getRequiredOption(cmdLine, ZK_URL_1_OPT, "ZK URL for cluster 1");
String clusterUrl1 =
getRequiredOption(cmdLine, CLUSTER_URL_1_OPT, "cluster URL for cluster 1");
String clusterRole1Str =
getRequiredOption(cmdLine, CLUSTER_ROLE_1_OPT, "cluster role for cluster 1");
String zkUrl2 = getRequiredOption(cmdLine, ZK_URL_2_OPT, "ZK URL for cluster 2");
String clusterUrl2 =
getRequiredOption(cmdLine, CLUSTER_URL_2_OPT, "cluster URL for cluster 2");
String clusterRole2Str =
getRequiredOption(cmdLine, CLUSTER_ROLE_2_OPT, "cluster role for cluster 2");

String hdfsUrl1 = getRequiredOption(cmdLine, HDFS_URL_1_OPT, "HDFS URL for cluster 1");
String hdfsUrl2 = getRequiredOption(cmdLine, HDFS_URL_2_OPT, "HDFS URL for cluster 2");
final boolean dryRun = cmdLine.hasOption(DRY_RUN_OPT.getOpt());

long adminVersion = 1L;
String adminVersionStr = cmdLine.getOptionValue(ADMIN_VERSION_OPT.getOpt());
if (adminVersionStr != null) {
adminVersion = Long.parseLong(adminVersionStr);
}

ClusterRole clusterRole1 = parseClusterRole(clusterRole1Str);
ClusterRole clusterRole2 = parseClusterRole(clusterRole2Str);

String localZkUrl = getLocalZkUrl(getConf());

if (haGroupExistsInSystemTable(haGroupName, localZkUrl)) {
System.out.println("HA group '" + haGroupName
+ "' already exists in SYSTEM.HA_GROUP. Skipping creation. Use update command to modify it.");
return RET_SUCCESS;
}

System.out.println("\n=== HA Group to Create ===\n");
System.out.println(String.format(" %-25s: %s", "HA Group Name", haGroupName));
System.out.println(String.format(" %-25s: %s", "Policy", policy));
System.out.println(String.format(" %-25s: %s", "ZK URL 1", zkUrl1));
System.out.println(String.format(" %-25s: %s", "Cluster URL 1", clusterUrl1));
System.out.println(String.format(" %-25s: %s", "Cluster Role 1", clusterRole1));
System.out.println(String.format(" %-25s: %s", "ZK URL 2", zkUrl2));
System.out.println(String.format(" %-25s: %s", "Cluster URL 2", clusterUrl2));
System.out.println(String.format(" %-25s: %s", "Cluster Role 2", clusterRole2));
System.out.println(String.format(" %-25s: %s", "HDFS URL 1", hdfsUrl1));
System.out.println(String.format(" %-25s: %s", "HDFS URL 2", hdfsUrl2));
System.out.println(String.format(" %-25s: %d", "Admin Version", adminVersion));
System.out.println();

if (dryRun) {
System.out.println("\n\u2713 Dry-run completed. No changes applied.");
return RET_SUCCESS;
}

insertIntoSystemTable(haGroupName, policy, zkUrl1, clusterUrl1, clusterRole1, hdfsUrl1,
zkUrl2, clusterUrl2, clusterRole2, hdfsUrl2, adminVersion, localZkUrl);

System.out.println(" \u2713 SYSTEM.HA_GROUP entry created.");

HAGroupStoreManager manager = HAGroupStoreManager.getInstance(getConf());
Optional<HAGroupStoreRecord> record = manager.getHAGroupStoreRecord(haGroupName);
if (record.isPresent()) {
System.out
.println(" \u2713 Znode initialized (state: " + record.get().getHAGroupState() + ").");
} else {
System.err.println(
" \u26a0 Znode initialization returned empty record for '" + haGroupName + "'.");
}

System.out.println("\n\u2713 HA group '" + haGroupName + "' created successfully.");
return RET_SUCCESS;

} catch (IllegalArgumentException e) {
System.err.println("\n\u2717 Invalid argument: " + e.getMessage());
return RET_ARGUMENT_ERROR;
} catch (Exception e) {
System.err.println("\n\u2717 Create failed: " + e.getMessage());
LOG.error("Create command failed", e);
return RET_UPDATE_ERROR;
}
}

/**
* Check if an HA group entry already exists in SYSTEM.HA_GROUP.
*/
private boolean haGroupExistsInSystemTable(String haGroupName, String localZkUrl)
throws SQLException {
String query =
"SELECT COUNT(*) FROM " + SYSTEM_HA_GROUP_NAME + " WHERE " + HA_GROUP_NAME + " = ?";
try (
PhoenixConnection conn = (PhoenixConnection) DriverManager
.getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + localZkUrl);
PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, haGroupName);
try (ResultSet rs = pstmt.executeQuery()) {
if (rs.next()) {
return rs.getLong(1) > 0;
}
}
}
return false;
}

/**
* Insert a new HA group row into SYSTEM.HA_GROUP using symmetric slot-based columns.
*/
private void insertIntoSystemTable(String haGroupName, String policy, String zkUrl1,
String clusterUrl1, ClusterRole clusterRole1, String hdfsUrl1, String zkUrl2,
String clusterUrl2, ClusterRole clusterRole2, String hdfsUrl2, long adminVersion,
String localZkUrl) throws SQLException {

String insertQuery = "UPSERT INTO " + SYSTEM_HA_GROUP_NAME + " (" + HA_GROUP_NAME + ", "
+ POLICY + ", " + ZK_URL_1 + ", " + CLUSTER_URL_1 + ", " + CLUSTER_ROLE_1 + ", " + HDFS_URL_1
+ ", " + ZK_URL_2 + ", " + CLUSTER_URL_2 + ", " + CLUSTER_ROLE_2 + ", " + HDFS_URL_2 + ", "
+ VERSION + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

try (
PhoenixConnection conn = (PhoenixConnection) DriverManager
.getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + localZkUrl);
PreparedStatement pstmt = conn.prepareStatement(insertQuery)) {
pstmt.setString(1, haGroupName);
pstmt.setString(2, policy);
pstmt.setString(3, zkUrl1);
pstmt.setString(4, clusterUrl1);
pstmt.setString(5, clusterRole1.name());
pstmt.setString(6, hdfsUrl1);
pstmt.setString(7, zkUrl2);
pstmt.setString(8, clusterUrl2);
pstmt.setString(9, clusterRole2.name());
pstmt.setString(10, hdfsUrl2);
pstmt.setLong(11, adminVersion);
pstmt.executeUpdate();
conn.commit();
}
}

/**
* Parse a ClusterRole from a string value.
*/
private ClusterRole parseClusterRole(String role) {
try {
return ClusterRole.valueOf(role.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid cluster role: " + role + "\nValid roles: "
+ Arrays.stream(ClusterRole.values()).map(Enum::name).collect(Collectors.joining(", ")));
}
}

/**
* Read current admin version from ZK and increment by 1
*/
Expand Down Expand Up @@ -1226,6 +1425,17 @@ private void printHAGroupStoreRecord(HAGroupStoreRecord record) {
}
}

/**
* Create options for create command
*/
private static Options createCreateOptions() {
return new Options().addOption(HELP_OPT).addOption(HA_GROUP_OPT).addOption(POLICY_OPT)
.addOption(ZK_URL_1_OPT).addOption(CLUSTER_URL_1_OPT).addOption(CLUSTER_ROLE_1_OPT)
.addOption(ZK_URL_2_OPT).addOption(CLUSTER_URL_2_OPT).addOption(CLUSTER_ROLE_2_OPT)
.addOption(HDFS_URL_1_OPT).addOption(HDFS_URL_2_OPT).addOption(ADMIN_VERSION_OPT)
.addOption(DRY_RUN_OPT);
}

/**
* Create options for update command
*/
Expand Down Expand Up @@ -1324,6 +1534,8 @@ private void printUsage() {
System.out.println("Usage: phoenix-consistentha-admin-tool <command> [options]");
System.out.println();
System.out.println("Commands:");
System.out
.println(" create Create a new HA group in SYSTEM.HA_GROUP (idempotent)");
System.out.println(" update Update HA group configuration");
System.out.println(" get Show HA group configuration");
System.out.println(" list List all HA groups");
Expand All @@ -1335,6 +1547,10 @@ private void printUsage() {
.println("Run 'phoenix-consistentha-admin-tool <command> --help' for command-specific help");
System.out.println();
System.out.println("Examples:");
System.out.println(" phoenix-consistentha-admin-tool create -g myHAGroup -p FAILOVER"
+ " -zk1 my-zk:2181:/hbase -c1 my-hmaster:16000 -cr1 ACTIVE"
+ " -zk2 my-zk:2181:/hbase -c2 my-hmaster:16000 -cr2 STANDBY"
+ " -hdfs1 hdfs://host1:8020 -hdfs2 hdfs://host2:8020");
System.out.println(
" phoenix-consistentha-admin-tool update -g myHAGroup " + "-pz newhost:2181:/hbase -v 5");
System.out.println(" phoenix-consistentha-admin-tool get -g myHAGroup");
Expand All @@ -1345,6 +1561,50 @@ private void printUsage() {
System.out.println();
}

/**
* Print create command help
*/
private void printCreateHelp() {
System.out.println();
System.out.println("Usage: phoenix-consistentha-admin-tool create [options]");
System.out.println();
System.out.println("Description:");
System.out.println(" Creates a new HA group entry in SYSTEM.HA_GROUP. The ZK znode is");
System.out.println(" initialized automatically on first access. This command is idempotent:");
System.out.println(" running it again on an existing HA group prints a skip message and");
System.out.println(" returns success. Run the same command on both clusters.");
System.out.println();
System.out.println("REQUIRED:");
System.out.println(" -g, --ha-group <name> HA group name");
System.out.println(" -p, --policy <policy> HA policy (e.g., FAILOVER)");
System.out.println(" -zk1, --zk-url-1 <url> ZK URL for cluster 1");
System.out.println(" -c1, --cluster-url-1 <url> HBase cluster URL for cluster 1");
System.out.println(" -cr1, --cluster-role-1 <role> Cluster role for cluster 1");
System.out.println(" -zk2, --zk-url-2 <url> ZK URL for cluster 2");
System.out.println(" -c2, --cluster-url-2 <url> HBase cluster URL for cluster 2");
System.out.println(" -cr2, --cluster-role-2 <role> Cluster role for cluster 2");
System.out.println();
System.out.println(" -hdfs1, --hdfs-url-1 <url> HDFS URL for cluster 1");
System.out.println(" -hdfs2, --hdfs-url-2 <url> HDFS URL for cluster 2");
System.out.println();
System.out.println("OPTIONAL:");
System.out.println(" -v, --admin-version <ver> Initial admin version (default: 1)");
System.out.println(" -d, --dry-run Show what would be created");
System.out.println(" -h, --help Show this help");
System.out.println();
System.out.println("Valid Cluster Roles:");
System.out.println(
" " + Arrays.stream(ClusterRole.values()).map(Enum::name).collect(Collectors.joining(", ")));
System.out.println();
System.out.println("Example:");
System.out.println(" phoenix-consistentha-admin-tool create -g myHAGroup -p FAILOVER \\");
System.out.println(" -zk1 my-zk:2181:/hbase -c1 my-hmaster:16000 -cr1 ACTIVE \\");
System.out.println(" -zk2 my-zk:2181:/hbase -c2 my-hmaster:16000 -cr2 STANDBY");
System.out.println();
System.out.println("Note: Run this same command on both clusters.");
System.out.println();
}

/**
* Print update command help
*/
Expand Down
Loading