From 02ef187a353699462527e77e74d2125b40bc65cb Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Tue, 5 May 2015 21:51:45 +0100 Subject: [PATCH 1/2] Launch separate task trackers for Map/Reduce slots --- .../apache/hadoop/mapred/MesosTracker.java | 29 +- .../apache/hadoop/mapred/ResourcePolicy.java | 531 ++++++++++-------- 2 files changed, 307 insertions(+), 253 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index 021e0ea..e5f8adf 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -3,6 +3,7 @@ import org.apache.commons.httpclient.HttpHost; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.mesos.Protos.TaskID; import java.util.Collection; @@ -20,8 +21,8 @@ public class MesosTracker { public static final Log LOG = LogFactory.getLog(MesosScheduler.class); public volatile HttpHost host; public TaskID taskId; - public long mapSlots; - public long reduceSlots; + public long slots; + public TaskType slotType; public volatile long idleCounter = 0; public volatile long idleCheckInterval = 0; public volatile long idleCheckMax = 0; @@ -32,12 +33,12 @@ public class MesosTracker { public Set jobs = Collections.newSetFromMap(new ConcurrentHashMap()); public com.codahale.metrics.Timer.Context context; - public MesosTracker(HttpHost host, TaskID taskId, long mapSlots, - long reduceSlots, MesosScheduler scheduler) { + public MesosTracker(HttpHost host, TaskID taskId, long slots, + TaskType slotType, MesosScheduler scheduler) { this.host = host; this.taskId = taskId; - this.mapSlots = mapSlots; - this.reduceSlots = reduceSlots; + this.slots = slots; + this.slotType = slotType; this.scheduler = scheduler; if (scheduler.metrics != null) { @@ -119,23 +120,23 @@ public void run() { return; } - long idleMapSlots = 0; - long idleReduceSlots = 0; + long idleSlots = 0; Collection taskTrackers = scheduler.jobTracker.taskTrackers(); for (TaskTrackerStatus status : taskTrackers) { HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); if (host.toString().equals(MesosTracker.this.host.toString())) { - idleMapSlots += status.getAvailableMapSlots(); - idleReduceSlots += status.getAvailableReduceSlots(); + if (slotType == TaskType.MAP) { + idleSlots += status.getAvailableMapSlots(); + } else { + idleSlots += status.getAvailableReduceSlots(); + } + break; } } - trackerIsIdle = idleMapSlots == MesosTracker.this.mapSlots && - idleReduceSlots == MesosTracker.this.reduceSlots; - - if (trackerIsIdle) { + if (idleSlots == MesosTracker.this.slots) { LOG.info("TaskTracker appears idle right now: " + MesosTracker.this.host); MesosTracker.this.idleCounter += 1; } else { diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java index 3a52888..df1f31b 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java @@ -120,8 +120,11 @@ public void computeNeededSlots(List jobsInProgress, int inactiveReduceSlots = 0; for (MesosTracker tracker : scheduler.mesosTrackers.values()) { if (!tracker.active) { - inactiveMapSlots += tracker.mapSlots; - inactiveReduceSlots += tracker.reduceSlots; + if (tracker.slotType == TaskType.MAP) { + inactiveMapSlots += tracker.slots; + } else { + inactiveReduceSlots += tracker.slots; + } } } @@ -225,9 +228,14 @@ public void resourceOffers(SchedulerDriver schedulerDriver, neededMapSlots = Math.max(0, neededMapSlots); neededReduceSlots = Math.max(0, neededReduceSlots); + // Figure out how many ports we actually need from the resources set + int neededPorts = (neededMapSlots > 0 && neededReduceSlots > 0) ? + 4 : ((neededMapSlots > 0 || neededReduceSlots > 0) ? 2 : 0); + cpus = -1.0; mem = -1.0; disk = -1.0; + Set ports = new HashSet(); String cpuRole = new String("*"); String memRole = cpuRole; @@ -258,7 +266,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, LOG.warn("Ignoring invalid port range: begin=" + begin + " end=" + end); continue; } - while (begin <= end && ports.size() < 2) { + while (begin <= end && ports.size() < neededPorts) { int port = begin + (int)(Math.random() * ((end - begin) + 1)); ports.add(port); begin += 1; @@ -287,276 +295,321 @@ public void resourceOffers(SchedulerDriver schedulerDriver, double taskMem = (mapSlots + reduceSlots) * slotMem + containerMem; double taskDisk = (mapSlots + reduceSlots) * slotDisk + containerDisk; - if (!sufficient || ports.size() < 2) { + if (!sufficient || ports.size() < neededPorts) { LOG.info(join("\n", Arrays.asList( "Declining offer with insufficient resources for a TaskTracker: ", " cpus: offered " + cpus + " needed at least " + taskCpus, " mem : offered " + mem + " needed at least " + taskMem, " disk: offered " + disk + " needed at least " + taskDisk, - " ports: " + (ports.size() < 2 - ? " less than 2 offered" - : " at least 2 (sufficient)")))); + " ports: " + (ports.size() < neededPorts + ? " less than " + neededPorts + " offered" + : " at least " + neededPorts + " (sufficient)")))); schedulerDriver.declineOffer(offer.getId()); continue; } Iterator portIter = ports.iterator(); - HttpHost httpAddress = new HttpHost(offer.getHostname(), portIter.next()); - HttpHost reportAddress = new HttpHost(offer.getHostname(), portIter.next()); - - // Check that this tracker is not already launched. This problem was - // observed on a few occasions, but not reliably. The main symptom was - // that entries in `mesosTrackers` were being lost, and task trackers - // would be 'lost' mysteriously (probably because the ports were in - // use). This problem has since gone away with a rewrite of the port - // selection code, but the check + logging is left here. - // TODO(brenden): Diagnose this to determine root cause. - if (scheduler.mesosTrackers.containsKey(httpAddress)) { - LOG.info(join("\n", Arrays.asList( - "Declining offer because host/port combination is in use: ", - " cpus: offered " + cpus + " needed " + taskCpus, - " mem : offered " + mem + " needed " + taskMem, - " disk: offered " + disk + " needed " + taskDisk, - " ports: " + ports))); - - schedulerDriver.declineOffer(offer.getId()); - continue; - } + ArrayList tasks = new ArrayList(); + + if (mapSlots > 0) { + TaskInfo taskInfo = buildTaskInfo( + offer, + mapSlots, + TaskType.MAP, + portIter.next(), + portIter.next(), + cpuRole, memRole, diskRole, portsRole + ); + + if (taskInfo == null) { + schedulerDriver.declineOffer(offer.getId()); + continue; + } - TaskID taskId = TaskID.newBuilder() - .setValue("Task_Tracker_" + scheduler.launchedTrackers++).build(); - - LOG.info("Launching task " + taskId.getValue() + " on " - + httpAddress.toString() + " with mapSlots=" + mapSlots + " reduceSlots=" + reduceSlots); - - List defaultJvmOpts = Arrays.asList( - "-XX:+UseConcMarkSweepGC", - "-XX:+CMSParallelRemarkEnabled", - "-XX:+CMSClassUnloadingEnabled", - "-XX:+UseParNewGC", - "-XX:TargetSurvivorRatio=80", - "-XX:+UseTLAB", - "-XX:ParallelGCThreads=2", - "-XX:+AggressiveOpts", - "-XX:+UseCompressedOops", - "-XX:+UseFastEmptyMethods", - "-XX:+UseFastAccessorMethods", - "-Xss512k", - "-XX:+AlwaysPreTouch", - "-XX:CMSInitiatingOccupancyFraction=80" - ); - - String jvmOpts = scheduler.conf.get("mapred.mesos.executor.jvm.opts"); - if (jvmOpts == null) { - jvmOpts = StringUtils.join(" ", defaultJvmOpts); + tasks.add(taskInfo); } - // Set up the environment for running the TaskTracker. - Protos.Environment.Builder envBuilder = Protos.Environment - .newBuilder() - .addVariables( - Protos.Environment.Variable.newBuilder() - .setName("HADOOP_OPTS") - .setValue( - jvmOpts + - " -Xmx" + tasktrackerJVMHeap + "m" + - " -XX:NewSize=" + tasktrackerJVMHeap / 3 + "m -XX:MaxNewSize=" + (int)Math.floor - (tasktrackerJVMHeap * 0.6) + "m" - )); - - // Set java specific environment, appropriately. - Map env = System.getenv(); - if (env.containsKey("JAVA_HOME")) { - envBuilder.addVariables(Protos.Environment.Variable.newBuilder() - .setName("JAVA_HOME") - .setValue(env.get("JAVA_HOME"))); - } + if (mapSlots > 0) { + TaskInfo taskInfo = buildTaskInfo( + offer, + reduceSlots, + TaskType.REDUCE, + portIter.next(), + portIter.next(), + cpuRole, memRole, diskRole, portsRole + ); + + if (taskInfo == null) { + schedulerDriver.declineOffer(offer.getId()); + continue; + } - if (env.containsKey("JAVA_LIBRARY_PATH")) { - envBuilder.addVariables(Protos.Environment.Variable.newBuilder() - .setName("JAVA_LIBRARY_PATH") - .setValue(env.get("JAVA_LIBRARY_PATH"))); + tasks.add(taskInfo); } - // Command info differs when performing a local run. - String master = scheduler.conf.get("mapred.mesos.master"); + // Launch the tasks or decline the offer + if (tasks.size() > 0) { + schedulerDriver.launchTasks(Arrays.asList(offer.getId()), tasks); - if (master == null) { - throw new RuntimeException( - "Expecting configuration property 'mapred.mesos.master'"); - } else if (master == "local") { - throw new RuntimeException( - "Can not use 'local' for 'mapred.mesos.executor'"); + neededMapSlots -= mapSlots; + neededReduceSlots -= reduceSlots; + } else { + schedulerDriver.declineOffer(offer.getId()); } + } - String uri = scheduler.conf.get("mapred.mesos.executor.uri"); - String directory = scheduler.conf.get("mapred.mesos.executor.directory"); - boolean isUriSet = uri != null && !uri.equals(""); - boolean isDirectorySet = directory != null && !directory.equals(""); - - if (!isUriSet && !isDirectorySet) { - throw new RuntimeException( - "Expecting configuration property 'mapred.mesos.executor'"); - } else if (isUriSet && isDirectorySet) { - throw new RuntimeException( - "Conflicting properties 'mapred.mesos.executor.uri' and 'mapred.mesos.executor.directory', only one can be set"); - } else if (!isDirectorySet) { - LOG.info("URI: " + uri + ", name: " + new File(uri).getName()); - - directory = new File(uri).getName().split("\\.")[0] + "*"; - } else if (!isUriSet) { - LOG.info("mapred.mesos.executor.uri is not set, relying on configured 'mapred.mesos.executor.directory' for working Hadoop distribution"); - } + if (neededMapSlots <= 0 && neededReduceSlots <= 0) { + LOG.info("Satisfied map and reduce slots needed."); + } else { + LOG.info("Unable to fully satisfy needed map/reduce slots: " + + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "") + + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "") + + "remaining"); + } + } + } - String command = scheduler.conf.get("mapred.mesos.executor.command"); - if (command == null || command.equals("")) { - command = "env ; ./bin/hadoop org.apache.hadoop.mapred.MesosExecutor"; - } + private TaskInfo buildTaskInfo(Offer offer, long slots, TaskType type, + Integer httpPort, Integer reportPort, + String cpuRole, String memRole, String diskRole, + String portsRole) { + + HttpHost httpAddress = new HttpHost(offer.getHostname(), httpPort); + HttpHost reportAddress = new HttpHost(offer.getHostname(), reportPort); + + // Check that this tracker is not already launched. This problem was + // observed on a few occasions, but not reliably. The main symptom was + // that entries in `mesosTrackers` were being lost, and task trackers + // would be 'lost' mysteriously (probably because the ports were in + // use). This problem has since gone away with a rewrite of the port + // selection code, but the check + logging is left here. + // TODO(brenden): Diagnose this to determine root cause. + if (scheduler.mesosTrackers.containsKey(httpAddress)) { + LOG.info("Declining offer because host/port combination is in use"); + return null; + } - CommandInfo.Builder commandInfo = CommandInfo.newBuilder(); - commandInfo - .setEnvironment(envBuilder) - .setValue(String.format("cd %s && %s", directory, command)); - if (uri != null) { - commandInfo.addUris(CommandInfo.URI.newBuilder().setValue(uri)); - } + String typeStr = (type == TaskType.MAP) ? "Map" : "Reduce"; + TaskID taskId = TaskID.newBuilder() + .setValue("Task_Tracker_" + typeStr + "_" + scheduler.launchedTrackers++) + .build(); + + LOG.info("Launching task " + taskId.getValue() + " on " + + httpAddress.toString() + " with " + typeStr.toLowerCase() + "Slots=" + slots); + + List defaultJvmOpts = Arrays.asList( + "-XX:+UseConcMarkSweepGC", + "-XX:+CMSParallelRemarkEnabled", + "-XX:+CMSClassUnloadingEnabled", + "-XX:+UseParNewGC", + "-XX:TargetSurvivorRatio=80", + "-XX:+UseTLAB", + "-XX:ParallelGCThreads=2", + "-XX:+AggressiveOpts", + "-XX:+UseCompressedOops", + "-XX:+UseFastEmptyMethods", + "-XX:+UseFastAccessorMethods", + "-Xss512k", + "-XX:+AlwaysPreTouch", + "-XX:CMSInitiatingOccupancyFraction=80" + ); + + String jvmOpts = scheduler.conf.get("mapred.mesos.executor.jvm.opts"); + if (jvmOpts == null) { + jvmOpts = StringUtils.join(" ", defaultJvmOpts); + } - // Populate ContainerInfo if needed - String containerImage = scheduler.conf.get("mapred.mesos.container.image"); - String[] containerOptions = scheduler.conf.getStrings("mapred.mesos.container.options"); + // Set up the environment for running the TaskTracker. + Protos.Environment.Builder envBuilder = Protos.Environment + .newBuilder() + .addVariables( + Protos.Environment.Variable.newBuilder() + .setName("HADOOP_OPTS") + .setValue( + jvmOpts + + " -Xmx" + tasktrackerJVMHeap + "m" + + " -XX:NewSize=" + tasktrackerJVMHeap / 3 + "m -XX:MaxNewSize=" + (int)Math.floor + (tasktrackerJVMHeap * 0.6) + "m" + )); + + // Set java specific environment, appropriately. + Map env = System.getenv(); + if (env.containsKey("JAVA_HOME")) { + envBuilder.addVariables(Protos.Environment.Variable.newBuilder() + .setName("JAVA_HOME") + .setValue(env.get("JAVA_HOME"))); + } - if (containerImage != null || (containerOptions != null && containerOptions.length > 0)) { - CommandInfo.ContainerInfo.Builder containerInfo = - CommandInfo.ContainerInfo.newBuilder(); + if (env.containsKey("JAVA_LIBRARY_PATH")) { + envBuilder.addVariables(Protos.Environment.Variable.newBuilder() + .setName("JAVA_LIBRARY_PATH") + .setValue(env.get("JAVA_LIBRARY_PATH"))); + } - if (containerImage != null) { - containerInfo.setImage(containerImage); - } + // Command info differs when performing a local run. + String master = scheduler.conf.get("mapred.mesos.master"); - if (containerOptions != null) { - for (int i = 0; i < containerOptions.length; i++) { - containerInfo.addOptions(containerOptions[i]); - } - } + if (master == null) { + throw new RuntimeException( + "Expecting configuration property 'mapred.mesos.master'"); + } else if (master == "local") { + throw new RuntimeException( + "Can not use 'local' for 'mapred.mesos.executor'"); + } - commandInfo.setContainer(containerInfo.build()); - } + String uri = scheduler.conf.get("mapred.mesos.executor.uri"); + String directory = scheduler.conf.get("mapred.mesos.executor.directory"); + boolean isUriSet = uri != null && !uri.equals(""); + boolean isDirectorySet = directory != null && !directory.equals(""); + + if (!isUriSet && !isDirectorySet) { + throw new RuntimeException( + "Expecting configuration property 'mapred.mesos.executor'"); + } else if (isUriSet && isDirectorySet) { + throw new RuntimeException( + "Conflicting properties 'mapred.mesos.executor.uri' and 'mapred.mesos.executor.directory', only one can be set"); + } else if (!isDirectorySet) { + LOG.info("URI: " + uri + ", name: " + new File(uri).getName()); + + directory = new File(uri).getName().split("\\.")[0] + "*"; + } else if (!isUriSet) { + LOG.info("mapred.mesos.executor.uri is not set, relying on configured 'mapred.mesos.executor.directory' for working Hadoop distribution"); + } - // Create a configuration from the current configuration and - // override properties as appropriate for the TaskTracker. - Configuration overrides = new Configuration(scheduler.conf); - - overrides.set("mapred.task.tracker.http.address", - httpAddress.getHostName() + ':' + httpAddress.getPort()); - - overrides.set("mapred.task.tracker.report.address", - reportAddress.getHostName() + ':' + reportAddress.getPort()); - - overrides.setLong("mapred.tasktracker.map.tasks.maximum", mapSlots); - overrides.setLong("mapred.tasktracker.reduce.tasks.maximum", reduceSlots); - - // Build up the executor info - ExecutorInfo executor = ExecutorInfo - .newBuilder() - .setExecutorId(ExecutorID.newBuilder().setValue( - "executor_" + taskId.getValue())) - .setName("Hadoop TaskTracker") - .setSource(taskId.getValue()) - .addResources( - Resource - .newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setRole(cpuRole) - .setScalar(Value.Scalar.newBuilder().setValue(containerCpus))) - .addResources( - Resource - .newBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setRole(memRole) - .setScalar(Value.Scalar.newBuilder().setValue(containerMem))) - .addResources( - Resource - .newBuilder() - .setName("disk") - .setType(Value.Type.SCALAR) - .setRole(diskRole) - .setScalar(Value.Scalar.newBuilder().setValue(containerDisk))) - .setCommand(commandInfo.build()) - .build(); - - ByteString taskData; + String command = scheduler.conf.get("mapred.mesos.executor.command"); + if (command == null || command.equals("")) { + command = "env ; ./bin/hadoop org.apache.hadoop.mapred.MesosExecutor"; + } - try { - taskData = org.apache.mesos.hadoop.Utils.confToBytes(overrides); - } catch (IOException e) { - LOG.error("Caught exception serializing configuration"); + CommandInfo.Builder commandInfo = CommandInfo.newBuilder(); + commandInfo + .setEnvironment(envBuilder) + .setValue(String.format("cd %s && %s", directory, command)); + if (uri != null) { + commandInfo.addUris(CommandInfo.URI.newBuilder().setValue(uri)); + } - // Skip this offer completely - schedulerDriver.declineOffer(offer.getId()); - continue; - } + // Populate ContainerInfo if needed + String containerImage = scheduler.conf.get("mapred.mesos.container.image"); + String[] containerOptions = scheduler.conf.getStrings("mapred.mesos.container.options"); + + if (containerImage != null || (containerOptions != null && containerOptions.length > 0)) { + CommandInfo.ContainerInfo.Builder containerInfo = + CommandInfo.ContainerInfo.newBuilder(); - // Create the TaskTracker TaskInfo - TaskInfo trackerTaskInfo = TaskInfo - .newBuilder() - .setName(taskId.getValue()) - .setTaskId(taskId) - .setSlaveId(offer.getSlaveId()) - .addResources( - Resource - .newBuilder() - .setName("ports") - .setType(Value.Type.RANGES) - .setRole(portsRole) - .setRanges( - Value.Ranges - .newBuilder() - .addRange(Value.Range.newBuilder() - .setBegin(httpAddress.getPort()) - .setEnd(httpAddress.getPort())) - .addRange(Value.Range.newBuilder() - .setBegin(reportAddress.getPort()) - .setEnd(reportAddress.getPort())))) - .addResources( - Resource - .newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setRole(cpuRole) - .setScalar(Value.Scalar.newBuilder().setValue(taskCpus - containerCpus))) - .addResources( - Resource - .newBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setRole(memRole) - .setScalar(Value.Scalar.newBuilder().setValue(taskMem - containerCpus))) - .setData(taskData) - .setExecutor(executor) - .build(); - - // Add this tracker to Mesos tasks. - scheduler.mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId, - mapSlots, reduceSlots, scheduler)); - - // Launch the task - schedulerDriver.launchTasks(Arrays.asList(offer.getId()), Arrays.asList(trackerTaskInfo)); - - neededMapSlots -= mapSlots; - neededReduceSlots -= reduceSlots; + if (containerImage != null) { + containerInfo.setImage(containerImage); } - if (neededMapSlots <= 0 && neededReduceSlots <= 0) { - LOG.info("Satisfied map and reduce slots needed."); - } else { - LOG.info("Unable to fully satisfy needed map/reduce slots: " - + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "") - + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "") - + "remaining"); + if (containerOptions != null) { + for (int i = 0; i < containerOptions.length; i++) { + containerInfo.addOptions(containerOptions[i]); + } } + + commandInfo.setContainer(containerInfo.build()); } + + // Create a configuration from the current configuration and + // override properties as appropriate for the TaskTracker. + Configuration overrides = new Configuration(scheduler.conf); + + overrides.set("mapred.task.tracker.http.address", + httpAddress.getHostName() + ':' + httpAddress.getPort()); + + overrides.set("mapred.task.tracker.report.address", + reportAddress.getHostName() + ':' + reportAddress.getPort()); + + overrides.setLong("mapred.tasktracker." + typeStr.toLowerCase() + ".tasks.maximum", slots); + + // Build up the executor info + ExecutorInfo executor = ExecutorInfo + .newBuilder() + .setExecutorId(ExecutorID.newBuilder().setValue( + "executor_" + taskId.getValue())) + .setName("Hadoop TaskTracker") + .setSource(taskId.getValue()) + .addResources( + Resource + .newBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setRole(cpuRole) + .setScalar(Value.Scalar.newBuilder().setValue(containerCpus))) + .addResources( + Resource + .newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setRole(memRole) + .setScalar(Value.Scalar.newBuilder().setValue(containerMem))) + .addResources( + Resource + .newBuilder() + .setName("disk") + .setType(Value.Type.SCALAR) + .setRole(diskRole) + .setScalar(Value.Scalar.newBuilder().setValue(containerDisk))) + .setCommand(commandInfo.build()) + .build(); + + ByteString taskData; + + try { + taskData = org.apache.mesos.hadoop.Utils.confToBytes(overrides); + } catch (IOException e) { + LOG.error("Caught exception serializing configuration."); + return null; + } + + double taskCpus = slots * slotCpus; + double taskMem = slots * slotMem; + double taskDisk = slots * slotDisk; + + // Create the TaskTracker TaskInfo + TaskInfo trackerTaskInfo = TaskInfo + .newBuilder() + .setName(taskId.getValue()) + .setTaskId(taskId) + .setSlaveId(offer.getSlaveId()) + .addResources( + Resource + .newBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRole(portsRole) + .setRanges( + Value.Ranges + .newBuilder() + .addRange(Value.Range.newBuilder() + .setBegin(httpAddress.getPort()) + .setEnd(httpAddress.getPort())) + .addRange(Value.Range.newBuilder() + .setBegin(reportAddress.getPort()) + .setEnd(reportAddress.getPort())))) + .addResources( + Resource + .newBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setRole(cpuRole) + .setScalar(Value.Scalar.newBuilder().setValue(taskCpus))) + .addResources( + Resource + .newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setRole(memRole) + .setScalar(Value.Scalar.newBuilder().setValue(taskMem))) + .setData(taskData) + .setExecutor(executor) + .build(); + + // Add this tracker to Mesos tasks. + scheduler.mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId, + slots, type, scheduler)); + + return trackerTaskInfo; } } From 4a7b729e1edcfcad4ba318d8c0e955c73d495ae6 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Tue, 5 May 2015 22:09:08 +0100 Subject: [PATCH 2/2] Check reduceSlots instead of mapSlots, typo --- src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java index df1f31b..f3c9163 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java @@ -330,7 +330,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, tasks.add(taskInfo); } - if (mapSlots > 0) { + if (reduceSlots > 0) { TaskInfo taskInfo = buildTaskInfo( offer, reduceSlots,