Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions phoenix-core-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.79</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,33 @@ public static long getMaxLookbackInMillis(Configuration conf) {

/** Exposed for testing */
public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server";

/**
* The scan attribute to enable server-side chunk formation and checksum computation for
* PhoenixSyncTableTool.
*/
public static final String SYNC_TABLE_CHUNK_FORMATION = "_SyncTableChunkFormation";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all of these instead be named SYNC_TOOL ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have used SyncTableTool for user facing class/config. For others, I have used SyncTable, are you recommending to move all Classes and config to SyncTool instead of SyncTable i.e PhoenixSyncTableRegionScanner -> PhoenixSyncToolRegionScanner ?
I felt SyncTable is more self explainable compared to SyncTool, we can also change it to SyncTableTool at all places ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Its okay. Not a big deal. We can stick with the same naming convention.


/**
* The scan attribute to provide the target chunk size in bytes for PhoenixSyncTableTool.
*/
public static final String SYNC_TABLE_CHUNK_SIZE_BYTES = "_SyncTableChunkSizeBytes";

/**
* The scan attribute to provide the MessageDigest state for cross-region hash continuation in
* PhoenixSyncTableTool.
*/
public static final String SYNC_TABLE_CONTINUED_DIGEST_STATE = "_SyncTableContinuedDigestState";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add JavaDoc on all 3 constants individually with a description of what they the attribute is and what type of value it would contain?


/**
* PhoenixSyncTableTool chunk metadata cell qualifiers. These define the wire protocol between
* PhoenixSyncTableRegionScanner (server-side coprocessor) and PhoenixSyncTableMapper (client-side
* mapper). The coprocessor returns chunk metadata as HBase cells with these qualifiers, and the
* mapper parses them to extract chunk information.
*/
public static final byte[] SYNC_TABLE_START_KEY_QUALIFIER = Bytes.toBytes("START_KEY");
public static final byte[] SYNC_TABLE_HASH_QUALIFIER = Bytes.toBytes("HASH");
public static final byte[] SYNC_TABLE_ROW_COUNT_QUALIFIER = Bytes.toBytes("ROW_COUNT");
public static final byte[] SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER =
Bytes.toBytes("IS_PARTIAL_CHUNK");
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ public interface QueryServices extends SQLCloseable {

public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex";

// Timeout config for PhoenixSyncTableTool
public static final String SYNC_TABLE_QUERY_TIMEOUT_ATTRIB = "phoenix.sync.table.query.timeout";
public static final String SYNC_TABLE_RPC_TIMEOUT_ATTRIB = "phoenix.sync.table.rpc.timeout";
public static final String SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB =
"phoenix.sync.table.client.scanner.timeout";
public static final String SYNC_TABLE_RPC_RETRIES_COUNTER =
"phoenix.sync.table.rpc.retries.counter";

// Retries when doing server side writes to SYSTEM.CATALOG
public static final String METADATA_WRITE_RETRIES_NUMBER = "phoenix.metadata.rpc.retries.number";
public static final String METADATA_WRITE_RETRY_PAUSE = "phoenix.metadata.rpc.pause";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ public class QueryServicesOptions {
// hrs
public static final long DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD = 30000; // 30 secs

// 30 min scan timeout * 5 tries, with 2100ms total pause time between retries
public static final long DEFAULT_SYNC_TABLE_QUERY_TIMEOUT = (5 * 30000 * 60) + 2100;
public static final long DEFAULT_SYNC_TABLE_RPC_TIMEOUT = 30000 * 60; // 30 mins
public static final long DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins
public static final int DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level

/**
* HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
* and give some room for things in the middle
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.util;

import java.io.IOException;
import org.bouncycastle.crypto.digests.SHA256Digest;

/**
* Utility class for SHA-256 digest state serialization and deserialization. We are not using jdk
* bundled SHA, since their digest can't be serialized/deserialized which is needed for
* PhoenixSyncTableTool for cross-region hash continuation.
*/
public class SHA256DigestUtil {

/**
* Maximum allowed size for encoded SHA-256 digest state. BouncyCastle's SHA256Digest encoded
* state ranges from 53 to 113 bytes (52 base + 0-60 buffered words + 1 purpose byte). We allow up
* to 128 bytes as headroom.
*/
public static final int MAX_SHA256_DIGEST_STATE_SIZE = 128;

/**
* Encodes a SHA256Digest state to a byte array.
* @param digest The digest whose state should be encoded
* @return Byte array containing the raw BouncyCastle encoded state
*/
public static byte[] encodeDigestState(SHA256Digest digest) {
byte[] encoded = digest.getEncodedState();
if (encoded.length > MAX_SHA256_DIGEST_STATE_SIZE) {
throw new IllegalArgumentException(
String.format("SHA256 encoded state too large: %d, expected <= %d", encoded.length,
MAX_SHA256_DIGEST_STATE_SIZE));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check makes no sense to me, can you explain what exactly are you protecting against? Also not sure about the exception type implies.

return encoded;
}

/**
* Decodes a SHA256Digest state from a byte array.
* @param encodedState Byte array containing BouncyCastle encoded digest state
* @return SHA256Digest restored to the saved state
* @throws IOException if state is invalid, corrupted
*/
public static SHA256Digest decodeDigestState(byte[] encodedState) throws IOException {
if (encodedState == null || encodedState.length == 0) {
throw new IllegalArgumentException(
"Invalid encoded digest state: encodedState is null or empty");
}
if (encodedState.length > MAX_SHA256_DIGEST_STATE_SIZE) {
throw new IllegalArgumentException(
String.format("Invalid SHA256 state length: %d, expected <= %d", encodedState.length,
MAX_SHA256_DIGEST_STATE_SIZE));
}
return new SHA256Digest(encodedState);
}

/**
* Decodes a digest state and finalizes it to produce the SHA-256 checksum.
* @param encodedState Serialized BouncyCastle digest state
* @return 32-byte SHA-256 hash
* @throws IOException if state decoding fails
*/
public static byte[] finalizeDigestToChecksum(byte[] encodedState) throws IOException {
SHA256Digest digest = decodeDigestState(encodedState);
return finalizeDigestToChecksum(digest);
}

/**
* Finalizes a SHA256Digest to produce the final checksum.
* @param digest The digest to finalize
* @return 32-byte SHA-256 hash
*/
public static byte[] finalizeDigestToChecksum(SHA256Digest digest) {
byte[] hash = new byte[digest.getDigestSize()];
digest.doFinal(hash, 0);
return hash;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,11 @@ public static boolean isIndexRebuild(Scan scan) {
return scan.getAttribute((BaseScannerRegionObserverConstants.REBUILD_INDEXES)) != null;
}

public static boolean isSyncTableChunkFormationEnabled(Scan scan) {
return Arrays.equals(
scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_FORMATION), TRUE_BYTES);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to also rename the attribute to indicate that it is a boolean.

}

public static int getClientVersion(Scan scan) {
int clientVersion = UNKNOWN_CLIENT_VERSION;
byte[] clientVersionBytes =
Expand Down
6 changes: 5 additions & 1 deletion phoenix-core-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,12 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.79</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
Loading