diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 60faec56fa..6e53997085 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.task; + import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; @@ -33,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -40,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -51,6 +54,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.log4j.helpers.ThreadLocalMap; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.client.registry.zookeeper.ZkAMRegistryClient; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.Preconditions; @@ -64,6 +69,7 @@ import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; @@ -517,45 +523,92 @@ public static TezChild newTezChild(Configuration conf, String host, int port, St hadoopShim); } - public static void main(String[] args) throws IOException, InterruptedException, TezException { + public static void main(String[] args) throws Exception { TezClassLoader.setupTezClassLoader(); final Configuration defaultConf = new Configuration(); + String frameworkMode = System.getenv("TEZ_FRAMEWORK_MODE"); + if (frameworkMode == null || frameworkMode.isEmpty()) { + frameworkMode = defaultConf.get("tez.framework.mode", "YARN"); + } + + String host, appId, tokenIdentifier, containerIdentifier; + int port, attemptNumber; + Credentials credentials = new Credentials(); + + if ("STANDALONE_ZOOKEEPER".equalsIgnoreCase(frameworkMode)) { + DAGProtos.ConfigurationProto confProtoBefore = TezUtilsInternal.loadConfProtoFromText(); + TezUtilsInternal.addUserSpecifiedTezConfiguration( + defaultConf, confProtoBefore.getConfKeyValuesList()); + + ZkAMRegistryClient registry = ZkAMRegistryClient.getClient(defaultConf); + registry.start(); + + while (!registry.isInitialized()) { + TimeUnit.SECONDS.sleep(5); + } + + List records = registry.getAllRecords(); + if (records.isEmpty()) { + throw new RuntimeException("No AM found in ZooKeeper registry"); + } + AMRecord amRecord = records.getFirst(); + + host = amRecord.getHostName(); + port = amRecord.getPort(); + appId = amRecord.getApplicationId().toString(); + tokenIdentifier = appId; + attemptNumber = 1; + containerIdentifier = "container_" + appId + "_01_000001"; + + // FIX: Dummy token + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(appId)); + Token sessionToken = + new Token<>(identifier, new JobTokenSecretManager(defaultConf)); + credentials.addToken(new Text("SessionToken"), sessionToken); + + LOG.info("ZK Mode: Discovered AM {} at {}:{}", appId, host, port); + + } else { + assert args.length == 5; + host = args[0]; + port = Integer.parseInt(args[1]); + containerIdentifier = args[2]; + tokenIdentifier = args[3]; + attemptNumber = Integer.parseInt(args[4]); + + DAGProtos.ConfigurationProto confProto = + TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name())); + TezUtilsInternal.addUserSpecifiedTezConfiguration( + defaultConf, confProto.getConfKeyValuesList()); + } Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); final String pid = System.getenv().get("JVM_PID"); + String[] localDirs = + TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())); - - assert args.length == 5; - String host = args[0]; - int port = Integer.parseInt(args[1]); - final String containerIdentifier = args[2]; - final String tokenIdentifier = args[3]; - final int attemptNumber = Integer.parseInt(args[4]); - final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS - .name())); - CallerContext.setCurrent(new CallerContext.Builder("tez_"+tokenIdentifier).build()); - LOG.info("TezChild starting with PID=" + pid + ", containerIdentifier=" + containerIdentifier); + CallerContext.setCurrent(new CallerContext.Builder("tez_" + tokenIdentifier).build()); + LOG.info("TezChild starting with PID={}, containerIdentifier={}", pid, containerIdentifier); if (LOG.isDebugEnabled()) { - LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port - + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber - + " tokenIdentifier: " + tokenIdentifier); + LOG.debug( + "Info from cmd line: AM-host: {} AM-port: {} containerIdentifier: {} appAttemptNumber: {} tokenIdentifier: {}", + host, + port, + containerIdentifier, + attemptNumber, + tokenIdentifier); + credentials = UserGroupInformation.getCurrentUser().getCredentials(); } // Security framework already loaded the tokens into current ugi - DAGProtos.ConfigurationProto confProto = - TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name())); - TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, confProto.getConfKeyValuesList()); UserGroupInformation.setConfiguration(defaultConf); - Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); HadoopShim hadoopShim = new HadoopShimsLoader(defaultConf).getHadoopShim(); // log the system properties if (LOG.isInfoEnabled()) { String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(defaultConf); - if (systemPropsToLog != null) { - LOG.info(systemPropsToLog); - } + LOG.info(systemPropsToLog); } TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,