hadoop JobClient 源码
haddop JobClient 代码
文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.tools.CLI;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* <code>JobClient</code> is the primary interface for the user-job to interact
* with the cluster.
*
* <code>JobClient</code> provides facilities to submit jobs, track their
* progress, access component-tasks' reports/logs, get the Map-Reduce cluster
* status information etc.
*
* <p>The job submission process involves:
* <ol>
* <li>
* Checking the input and output specifications of the job.
* </li>
* <li>
* Computing the {@link InputSplit}s for the job.
* </li>
* <li>
* Setup the requisite accounting information for the {@link DistributedCache}
* of the job, if necessary.
* </li>
* <li>
* Copying the job's jar and configuration to the map-reduce system directory
* on the distributed file-system.
* </li>
* <li>
* Submitting the job to the cluster and optionally monitoring
* it's status.
* </li>
* </ol>
*
* Normally the user creates the application, describes various facets of the
* job via {@link JobConf} and then uses the <code>JobClient</code> to submit
* the job and monitor its progress.
*
* <p>Here is an example on how to use <code>JobClient</code>:</p>
* <p><blockquote><pre>
* // Create a new JobConf
* JobConf job = new JobConf(new Configuration(), MyJob.class);
*
* // Specify various job-specific parameters
* job.setJobName("myjob");
*
* job.setInputPath(new Path("in"));
* job.setOutputPath(new Path("out"));
*
* job.setMapperClass(MyJob.MyMapper.class);
* job.setReducerClass(MyJob.MyReducer.class);
*
* // Submit the job, then poll for progress until the job is complete
* JobClient.runJob(job);
* </pre></blockquote>
*
* <b id="JobControl">Job Control</b>
*
* <p>At times clients would chain map-reduce jobs to accomplish complex tasks
* which cannot be done via a single map-reduce job. This is fairly easy since
* the output of the job, typically, goes to distributed file-system and that
* can be used as the input for the next job.</p>
*
* <p>However, this also means that the onus on ensuring jobs are complete
* (success/failure) lies squarely on the clients. In such situations the
* various job-control options are:
* <ol>
* <li>
* {@link #runJob(JobConf)} : submits the job and returns only after
* the job has completed.
* </li>
* <li>
* {@link #submitJob(JobConf)} : only submits the job, then poll the
* returned handle to the {@link RunningJob} to query status and make
* scheduling decisions.
* </li>
* <li>
* {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
* on job-completion, thus avoiding polling.
* </li>
* </ol>
*
* @see JobConf
* @see ClusterStatus
* @see Tool
* @see DistributedCache
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class JobClient extends CLI implements AutoCloseable {
@InterfaceAudience.Private
public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
"mapreduce.jobclient.retry.policy.enabled";
@InterfaceAudience.Private
public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
false;
@InterfaceAudience.Private
public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
"mapreduce.jobclient.retry.policy.spec";
@InterfaceAudience.Private
public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
"10000,6,60000,10"; // t1,n1,t2,n2,...
public enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
private int maxRetry = MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES;
private long retryInterval =
MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL;
static{
ConfigUtil.loadResources();
}
/**
* A NetworkedJob is an implementation of RunningJob. It holds
* a JobProfile object to provide some info, and interacts with the
* remote service to provide certain functionality.
*/
static class NetworkedJob implements RunningJob {
Job job;
/**
* We store a JobProfile and a timestamp for when we last
* acquired the job profile. If the job is null, then we cannot
* perform any of the tasks. The job might be null if the cluster
* has completely forgotten about the job. (eg, 24 hours after the
* job completes.)
*/
public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
this(status, cluster, new JobConf(status.getJobFile()));
}
private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf)
throws IOException {
this(Job.getInstance(cluster, status, conf));
}
public NetworkedJob(Job job) throws IOException {
this.job = job;
}
public Configuration getConfiguration() {
return job.getConfiguration();
}
/**
* An identifier for the job
*/
public JobID getID() {
return JobID.downgrade(job.getJobID());
}
/** @deprecated This method is deprecated and will be removed. Applications should
* rather use {@link #getID()}.*/
@Deprecated
public String getJobID() {
return getID().toString();
}
/**
* The user-specified job name
*/
public String getJobName() {
return job.getJobName();
}
/**
* The name of the job file
*/
public String getJobFile() {
return job.getJobFile();
}
/**
* A URL where the job's status can be seen
*/
public String getTrackingURL() {
return job.getTrackingURL();
}
/**
* A float between 0.0 and 1.0, indicating the % of map work
* completed.
*/
public float mapProgress() throws IOException {
return job.mapProgress();
}
/**
* A float between 0.0 and 1.0, indicating the % of reduce work
* completed.
*/
public float reduceProgress() throws IOException {
return job.reduceProgress();
}
/**
* A float between 0.0 and 1.0, indicating the % of cleanup work
* completed.
*/
public float cleanupProgress() throws IOException {
try {
return job.cleanupProgress();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* A float between 0.0 and 1.0, indicating the % of setup work
* completed.
*/
public float setupProgress() throws IOException {
return job.setupProgress();
}
/**
* Returns immediately whether the whole job is done yet or not.
*/
public synchronized boolean isComplete() throws IOException {
return job.isComplete();
}
/**
* True iff job completed successfully.
*/
public synchronized boolean isSuccessful() throws IOException {
return job.isSuccessful();
}
/**
* Blocks until the job is finished
*/
public void waitForCompletion() throws IOException {
try {
job.waitForCompletion(false);
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException ce) {
throw new IOException(ce);
}
}
/**
* Tells the service to get the state of the current job.
*/
public synchronized int getJobState() throws IOException {
try {
return job.getJobState().getValue();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Tells the service to terminate the current job.
*/
public synchronized void killJob() throws IOException {
job.killJob();
}
/** Set the priority of the job.
* @param priority new priority of the job.
*/
public synchronized void setJobPriority(String priority)
throws IOException {
try {
job.setPriority(
org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Kill indicated task attempt.
* @param taskId the id of the task to kill.
* @param shouldFail if true the task is failed and added to failed tasks list, otherwise
* it is just killed, w/o affecting job failure status.
*/
public synchronized void killTask(TaskAttemptID taskId,
boolean shouldFail) throws IOException {
if (shouldFail) {
job.failTask(taskId);
} else {
job.killTask(taskId);
}
}
/** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
@Deprecated
public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
killTask(TaskAttemptID.forName(taskId), shouldFail);
}
/**
* Fetch task completion events from cluster for this job.
*/
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
int startFrom) throws IOException {
try {
org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls =
job.getTaskCompletionEvents(startFrom, 10);
TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
for (int i = 0 ; i < acls.length; i++ ) {
ret[i] = TaskCompletionEvent.downgrade(acls[i]);
}
return ret;
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Dump stats to screen
*/
@Override
public String toString() {
return job.toString();
}
/**
* Returns the counters for this job
*/
public Counters getCounters() throws IOException {
Counters result = null;
org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
if(temp != null) {
result = Counters.downgrade(temp);
}
return result;
}
@Override
public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
try {
return job.getTaskDiagnostics(id);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
public String getHistoryUrl() throws IOException {
try {
return job.getHistoryUrl();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
public boolean isRetired() throws IOException {
try {
return job.isRetired();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
boolean monitorAndPrintJob() throws IOException, InterruptedException {
return job.monitorAndPrintJob();
}
@Override
public String getFailureInfo() throws IOException {
try {
return job.getStatus().getFailureInfo();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
@Override
public JobStatus getJobStatus() throws IOException {
try {
return JobStatus.downgrade(job.getStatus());
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
}
/**
* Ugi of the client. We store this ugi when the client is created and
* then make sure that the same ugi is used to run the various protocols.
*/
UserGroupInformation clientUgi;
/**
* Create a job client.
*/
public JobClient() {
}
/**
* Build a job client with the given {@link JobConf}, and connect to the
* default cluster
*
* @param conf the job configuration.
* @throws IOException
*/
public JobClient(JobConf conf) throws IOException {
init(conf);
}
/**
* Build a job client with the given {@link Configuration},
* and connect to the default cluster
*
* @param conf the configuration.
* @throws IOException
*/
public JobClient(Configuration conf) throws IOException {
init(new JobConf(conf));
}
/**
* Connect to the default cluster
* @param conf the job configuration.
* @throws IOException
*/
public void init(JobConf conf) throws IOException {
setConf(conf);
cluster = new Cluster(conf);
clientUgi = UserGroupInformation.getCurrentUser();
maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
retryInterval =
conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
}
/**
* Build a job client, connect to the indicated job tracker.
*
* @param jobTrackAddr the job tracker to connect to.
* @param conf configuration.
*/
public JobClient(InetSocketAddress jobTrackAddr,
Configuration conf) throws IOException {
cluster = new Cluster(jobTrackAddr, conf);
clientUgi = UserGroupInformation.getCurrentUser();
}
/**
* Close the <code>JobClient</code>.
*/
@Override
public synchronized void close() throws IOException {
cluster.close();
}
/**
* Get a filesystem handle. We need this to prepare jobs
* for submission to the MapReduce system.
*
* @return the filesystem handle.
*/
public synchronized FileSystem getFs() throws IOException {
try {
return cluster.getFileSystem();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Get a handle to the Cluster
*/
public Cluster getClusterHandle() {
return cluster;
}
/**
* Submit a job to the MR system.
*
* This returns a handle to the {@link RunningJob} which can be used to track
* the running-job.
*
* @param jobFile the job configuration.
* @return a handle to the {@link RunningJob} which can be used to track the
* running-job.
* @throws FileNotFoundException
* @throws InvalidJobConfException
* @throws IOException
*/
public RunningJob submitJob(String jobFile) throws FileNotFoundException,
InvalidJobConfException,
IOException {
// Load in the submitted job details
JobConf job = new JobConf(jobFile);
return submitJob(job);
}
/**
* Submit a job to the MR system.
* This returns a handle to the {@link RunningJob} which can be used to track
* the running-job.
*
* @param conf the job configuration.
* @return a handle to the {@link RunningJob} which can be used to track the
* running-job.
* @throws FileNotFoundException
* @throws IOException
*/
public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
IOException {
return submitJobInternal(conf);
}
@InterfaceAudience.Private
public RunningJob submitJobInternal(final JobConf conf)
throws FileNotFoundException, IOException {
try {
conf.setBooleanIfUnset("mapred.mapper.new-api", false);
conf.setBooleanIfUnset("mapred.reducer.new-api", false);
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
@Override
public Job run() throws IOException, ClassNotFoundException,
InterruptedException {
Job job = Job.getInstance(conf);
job.submit();
return job;
}
});
Cluster prev = cluster;
// update our Cluster instance with the one created by Job for submission
// (we can't pass our Cluster instance to Job, since Job wraps the config
// instance, and the two configs would then diverge)
cluster = job.getCluster();
// It is important to close the previous cluster instance
// to cleanup resources.
if (prev != null) {
prev.close();
}
return new NetworkedJob(job);
} catch (InterruptedException ie) {
throw new IOException("interrupted", ie);
}
}
private Job getJobUsingCluster(final JobID jobid) throws IOException,
InterruptedException {
return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
public Job run() throws IOException, InterruptedException {
return cluster.getJob(jobid);
}
});
}
protected RunningJob getJobInner(final JobID jobid) throws IOException {
try {
Job job = getJobUsingCluster(jobid);
if (job != null) {
JobStatus status = JobStatus.downgrade(job.getStatus());
if (status != null) {
return new NetworkedJob(status, cluster,
new JobConf(job.getConfiguration()));
}
}
} catch (InterruptedException ie) {
throw new IOException(ie);
}
return null;
}
/**
* Get an {@link RunningJob} object to track an ongoing job. Returns
* null if the id does not correspond to any known job.
*
* @param jobid the jobid of the job.
* @return the {@link RunningJob} handle to track the job, null if the
* <code>jobid</code> doesn't correspond to any known job.
* @throws IOException
*/
public RunningJob getJob(final JobID jobid) throws IOException {
for (int i = 0;i <= maxRetry;i++) {
if (i > 0) {
try {
Thread.sleep(retryInterval);
} catch (Exception e) { }
}
RunningJob job = getJobInner(jobid);
if (job != null) {
return job;
}
}
return null;
}
/**@deprecated Applications should rather use {@link #getJob(JobID)}.
*/
@Deprecated
public RunningJob getJob(String jobid) throws IOException {
return getJob(JobID.forName(jobid));
}
private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
/**
* Get the information of the current state of the map tasks of a job.
*
* @param jobId the job to query.
* @return the list of all of the map tips.
* @throws IOException
*/
public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
return getTaskReports(jobId, TaskType.MAP);
}
private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws
IOException {
try {
Job j = getJobUsingCluster(jobId);
if(j == null) {
return EMPTY_TASK_REPORTS;
}
return TaskReport.downgradeArray(j.getTaskReports(type));
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
@Deprecated
public TaskReport[] getMapTaskReports(String jobId) throws IOException {
return getMapTaskReports(JobID.forName(jobId));
}
/**
* Get the information of the current state of the reduce tasks of a job.
*
* @param jobId the job to query.
* @return the list of all of the reduce tips.
* @throws IOException
*/
public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
return getTaskReports(jobId, TaskType.REDUCE);
}
/**
* Get the information of the current state of the cleanup tasks of a job.
*
* @param jobId the job to query.
* @return the list of all of the cleanup tips.
* @throws IOException
*/
public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
return getTaskReports(jobId, TaskType.JOB_CLEANUP);
}
/**
* Get the information of the current state of the setup tasks of a job.
*
* @param jobId the job to query.
* @return the list of all of the setup tips.
* @throws IOException
*/
public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
return getTaskReports(jobId, TaskType.JOB_SETUP);
}
/**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
@Deprecated
public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
return getReduceTaskReports(JobID.forName(jobId));
}
/**
* Display the information about a job's tasks, of a particular type and
* in a particular state
*
* @param jobId the ID of the job
* @param type the type of the task (map/reduce/setup/cleanup)
* @param state the state of the task
* (pending/running/completed/failed/killed)
* @throws IOException when there is an error communicating with the master
* @throws IllegalArgumentException if an invalid type/state is passed
*/
public void displayTasks(final JobID jobId, String type, String state)
throws IOException {
try {
Job job = getJobUsingCluster(jobId);
super.displayTasks(job, type, state);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Get status information about the Map-Reduce cluster.
*
* @return the status information about the Map-Reduce cluster as an object
* of {@link ClusterStatus}.
* @throws IOException
*/
public ClusterStatus getClusterStatus() throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
public ClusterStatus run() throws IOException, InterruptedException {
ClusterMetrics metrics = cluster.getClusterStatus();
return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
.getBlackListedTaskTrackerCount(), cluster
.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
metrics.getDecommissionedTaskTrackerCount(), metrics
.getGrayListedTaskTrackerCount());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
Collection<String> list = new ArrayList<String>();
for (TaskTrackerInfo info: objs) {
list.add(info.getTaskTrackerName());
}
return list;
}
private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
for (TaskTrackerInfo info: objs) {
BlackListInfo binfo = new BlackListInfo();
binfo.setTrackerName(info.getTaskTrackerName());
binfo.setReasonForBlackListing(info.getReasonForBlacklist());
binfo.setBlackListReport(info.getBlacklistReport());
list.add(binfo);
}
return list;
}
/**
* Get status information about the Map-Reduce cluster.
*
* @param detailed if true then get a detailed status including the
* tracker names
* @return the status information about the Map-Reduce cluster as an object
* of {@link ClusterStatus}.
* @throws IOException
*/
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
public ClusterStatus run() throws IOException, InterruptedException {
ClusterMetrics metrics = cluster.getClusterStatus();
return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
metrics.getReduceSlotCapacity(),
cluster.getJobTrackerStatus());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Get the jobs that are not completed and not failed.
*
* @return array of {@link JobStatus} for the running/to-be-run jobs.
* @throws IOException
*/
public JobStatus[] jobsToComplete() throws IOException {
List<JobStatus> stats = new ArrayList<JobStatus>();
for (JobStatus stat : getAllJobs()) {
if (!stat.isJobComplete()) {
stats.add(stat);
}
}
return stats.toArray(new JobStatus[0]);
}
/**
* Get the jobs that are submitted.
*
* @return array of {@link JobStatus} for the submitted jobs.
* @throws IOException
*/
public JobStatus[] getAllJobs() throws IOException {
try {
org.apache.hadoop.mapreduce.JobStatus[] jobs =
clientUgi.doAs(new PrivilegedExceptionAction<
org.apache.hadoop.mapreduce.JobStatus[]> () {
public org.apache.hadoop.mapreduce.JobStatus[] run()
throws IOException, InterruptedException {
return cluster.getAllJobStatuses();
}
});
JobStatus[] stats = new JobStatus[jobs.length];
for (int i = 0; i < jobs.length; i++) {
stats[i] = JobStatus.downgrade(jobs[i]);
}
return stats;
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Utility that submits a job, then polls for progress until the job is
* complete.
*
* @param job the job configuration.
* @throws IOException if the job fails
*/
public static RunningJob runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
RunningJob rj = jc.submitJob(job);
try {
if (!jc.monitorAndPrintJob(job, rj)) {
throw new IOException("Job failed!");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return rj;
}
/**
* Monitor a job and print status in real-time as progress is made and tasks
* fail.
* @param conf the job's configuration
* @param job the job to track
* @return true if the job succeeded
* @throws IOException if communication to the JobTracker fails
*/
public boolean monitorAndPrintJob(JobConf conf,
RunningJob job
) throws IOException, InterruptedException {
return ((NetworkedJob)job).monitorAndPrintJob();
}
static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
}
static Configuration getConfiguration(String jobTrackerSpec)
{
Configuration conf = new Configuration();
if (jobTrackerSpec != null) {
if (jobTrackerSpec.indexOf(":") >= 0) {
conf.set("mapred.job.tracker", jobTrackerSpec);
} else {
String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
URL validate = conf.getResource(classpathFile);
if (validate == null) {
throw new RuntimeException(classpathFile + " not found on CLASSPATH");
}
conf.addResource(classpathFile);
}
}
return conf;
}
/**
* Sets the output filter for tasks. only those tasks are printed whose
* output matches the filter.
* @param newValue task filter.
*/
@Deprecated
public void setTaskOutputFilter(TaskStatusFilter newValue){
this.taskOutputFilter = newValue;
}
/**
* Get the task output filter out of the JobConf.
*
* @param job the JobConf to examine.
* @return the filter level.
*/
public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
return TaskStatusFilter.valueOf(job.get("jobclient.output.filter",
"FAILED"));
}
/**
* Modify the JobConf to set the task output filter.
*
* @param job the JobConf to modify.
* @param newValue the value to set.
*/
public static void setTaskOutputFilter(JobConf job,
TaskStatusFilter newValue) {
job.set("jobclient.output.filter", newValue.toString());
}
/**
* Returns task output filter.
* @return task filter.
*/
@Deprecated
public TaskStatusFilter getTaskOutputFilter(){
return this.taskOutputFilter;
}
protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
String counterGroupName, String counterName) throws IOException {
Counters counters = Counters.downgrade(cntrs);
return counters.findCounter(counterGroupName, counterName).getValue();
}
/**
* Get status information about the max available Maps in the cluster.
*
* @return the max available Maps in the cluster
* @throws IOException
*/
public int getDefaultMaps() throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
@Override
public Integer run() throws IOException, InterruptedException {
return cluster.getClusterStatus().getMapSlotCapacity();
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Get status information about the max available Reduces in the cluster.
*
* @return the max available Reduces in the cluster
* @throws IOException
*/
public int getDefaultReduces() throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
@Override
public Integer run() throws IOException, InterruptedException {
return cluster.getClusterStatus().getReduceSlotCapacity();
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Grab the jobtracker system directory path where job-specific files are to be placed.
*
* @return the system directory where job-specific files are to be placed.
*/
public Path getSystemDir() {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
@Override
public Path run() throws IOException, InterruptedException {
return cluster.getSystemDir();
}
});
} catch (IOException ioe) {
return null;
} catch (InterruptedException ie) {
return null;
}
}
/**
* Checks if the job directory is clean and has all the required components
* for (re) starting the job
*/
public static boolean isJobDirValid(Path jobDirPath, FileSystem fs)
throws IOException {
FileStatus[] contents = fs.listStatus(jobDirPath);
int matchCount = 0;
if (contents != null && contents.length >= 2) {
for (FileStatus status : contents) {
if ("job.xml".equals(status.getPath().getName())) {
++matchCount;
}
if ("job.split".equals(status.getPath().getName())) {
++matchCount;
}
}
if (matchCount == 2) {
return true;
}
}
return false;
}
/**
* Fetch the staging area directory for the application
*
* @return path to staging area directory
* @throws IOException
*/
public Path getStagingAreaDir() throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
@Override
public Path run() throws IOException, InterruptedException {
return cluster.getStagingAreaDir();
}
});
} catch (InterruptedException ie) {
// throw RuntimeException instead for compatibility reasons
throw new RuntimeException(ie);
}
}
private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
JobQueueInfo ret = new JobQueueInfo(queue);
// make sure to convert any children
if (queue.getQueueChildren().size() > 0) {
List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
.getQueueChildren().size());
for (QueueInfo child : queue.getQueueChildren()) {
childQueues.add(getJobQueueInfo(child));
}
ret.setChildren(childQueues);
}
return ret;
}
private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
throws IOException {
JobQueueInfo[] ret = new JobQueueInfo[queues.length];
for (int i = 0; i < queues.length; i++) {
ret[i] = getJobQueueInfo(queues[i]);
}
return ret;
}
/**
* Returns an array of queue information objects about root level queues
* configured
*
* @return the array of root level JobQueueInfo objects
* @throws IOException
*/
public JobQueueInfo[] getRootQueues() throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
public JobQueueInfo[] run() throws IOException, InterruptedException {
return getJobQueueInfoArray(cluster.getRootQueues());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Returns an array of queue information objects about immediate children
* of queue queueName.
*
* @param queueName
* @return the array of immediate children JobQueueInfo objects
* @throws IOException
*/
public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
public JobQueueInfo[] run() throws IOException, InterruptedException {
return getJobQueueInfoArray(cluster.getChildQueues(queueName));
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Return an array of queue information objects about all the Job Queues
* configured.
*
* @return Array of JobQueueInfo objects
* @throws IOException
*/
public JobQueueInfo[] getQueues() throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
public JobQueueInfo[] run() throws IOException, InterruptedException {
return getJobQueueInfoArray(cluster.getQueues());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Gets all the jobs which were added to particular Job Queue
*
* @param queueName name of the Job Queue
* @return Array of jobs present in the job queue
* @throws IOException
*/
public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
try {
QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
@Override
public QueueInfo run() throws IOException, InterruptedException {
return cluster.getQueue(queueName);
}
});
if (queue == null) {
return null;
}
org.apache.hadoop.mapreduce.JobStatus[] stats =
queue.getJobStatuses();
JobStatus[] ret = new JobStatus[stats.length];
for (int i = 0 ; i < stats.length; i++ ) {
ret[i] = JobStatus.downgrade(stats[i]);
}
return ret;
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Gets the queue information associated to a particular Job Queue
*
* @param queueName name of the job queue.
* @return Queue information associated to particular queue.
* @throws IOException
*/
public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
try {
QueueInfo queueInfo = clientUgi.doAs(new
PrivilegedExceptionAction<QueueInfo>() {
public QueueInfo run() throws IOException, InterruptedException {
return cluster.getQueue(queueName);
}
});
if (queueInfo != null) {
return new JobQueueInfo(queueInfo);
}
return null;
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Gets the Queue ACLs for current user
* @return array of QueueAclsInfo object for current user.
* @throws IOException
*/
public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
try {
org.apache.hadoop.mapreduce.QueueAclsInfo[] acls =
clientUgi.doAs(new
PrivilegedExceptionAction
<org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
public org.apache.hadoop.mapreduce.QueueAclsInfo[] run()
throws IOException, InterruptedException {
return cluster.getQueueAclsForCurrentUser();
}
});
QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
for (int i = 0 ; i < acls.length; i++ ) {
ret[i] = QueueAclsInfo.downgrade(acls[i]);
}
return ret;
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Get a delegation token for the user from the JobTracker.
* @param renewer the user who can renew the token
* @return the new token
* @throws IOException
*/
public Token<DelegationTokenIdentifier>
getDelegationToken(final Text renewer) throws IOException, InterruptedException {
return clientUgi.doAs(new
PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
public Token<DelegationTokenIdentifier> run() throws IOException,
InterruptedException {
return cluster.getDelegationToken(renewer);
}
});
}
/**
* Renew a delegation token
* @param token the token to renew
* @return true if the renewal went well
* @throws InvalidToken
* @throws IOException
* @deprecated Use {@link Token#renew} instead
*/
public long renewDelegationToken(Token<DelegationTokenIdentifier> token
) throws InvalidToken, IOException,
InterruptedException {
return token.renew(getConf());
}
/**
* Cancel a delegation token from the JobTracker
* @param token the token to cancel
* @throws IOException
* @deprecated Use {@link Token#cancel} instead
*/
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
) throws InvalidToken, IOException,
InterruptedException {
token.cancel(getConf());
}
/**
*/
public static void main(String argv[]) throws Exception {
int res = ToolRunner.run(new JobClient(), argv);
System.exit(res);
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦