Apache Mesos Framework

Apache Mesos is a cluster manager that provides efficient resource sharing across distributed applications. One of the advantages of Apache Mesos is linear scale. Companies like Twitter and Airbnb have utilized Mesos and created their own framework on top of it.

Let’s take a couple of examples:

  • Mesos can also be used as a cluster manager and sits between the Spark Driver program and the Worker nodes.
  • Mesos can also be used to to distribute a set of tasks amongst the available nodes (machines) depending upon which machine has the bandwidth to take up a task. Some companies have even built framework on top of Mesos to replace their regularly scheduled tasks/jobs using cron expressions and mesos.

In this blog I will walk through the basic architecture of Apache Mesos and then follow up with a simple example of the Mesos Framework.

Architecture

Mesos Architecture

Mesos Architecture has 3 main components

  • Framework: interacts with the master through the Scheduler callback interface. This allows the framework to talk to the master.
  • Master: enables fine-grained sharing of resources (CPU, RAM, …) across frameworks by making them resource offers. Each resource offer contains a list of <agentId, resource1: amount1, resource2: amount2…>. Also any failure in master results in the loss of state about resources and tasks, thus it is deployed in a high availability configuration. Mesos deploys standby master daemons along with one leader. These daemons rely on Zookeeper for recovering state in case of a failure.
  • Agents (earlier called Slaves): A Mesos cluster must run an agent on every machine. These agents report their resources to the master periodically and in turn, receive tasks that an application has scheduled to run. This cycle repeats after the scheduled task is either complete or lost.

Example

Code can be found here: https://github.com/nihitpurwar/example-mesos

I set up the mesos cluster in Mac following the steps:

$ brew install mesos

# start the master
$ /usr/local/sbin/mesos-master --registry=in_memory --ip=127.0.0.1

# start one agent
$ sudo /usr/local/sbin/mesos-agent --master=127.0.0.1:5050 --work_dir=/Users/npurwar/tmp/mesos-slaves

You can check if the master has started by checking localhost:5050

Next we will start to implement the interface for Executor, see below. This executor is what we want to execute on the agents finally. Here I am just appending to a local file (check launchTask function). As you can imagine you could call whatever you would want even using reflection in this particular method to achieve your desired goal.

public class MyMesosExecutor implements Executor {

  private static final String FILE_NAME = "/Users/npurwar/tmp/test-mesos.txt";

  @Override
  public void registered(ExecutorDriver executorDriver, Protos.ExecutorInfo executorInfo,
      Protos.FrameworkInfo frameworkInfo, Protos.SlaveInfo slaveInfo) {
    System.err.println("MyMesosExecutor registered");
  }

  @Override
  public void reregistered(ExecutorDriver executorDriver, Protos.SlaveInfo slaveInfo) {
    System.err.println("MyMesosExecutor reregistered");
  }

  @Override
  public void disconnected(ExecutorDriver executorDriver) {
    System.err.println("MyMesosExecutor disconnected");
  }

  @Override
  public void launchTask(ExecutorDriver driver, Protos.TaskInfo task) {
    Protos.TaskStatus status =
        Protos.TaskStatus.newBuilder().setTaskId(task.getTaskId()).setState(Protos.TaskState.TASK_RUNNING).build();
    driver.sendStatusUpdate(status);

    String workStatement = "Mesos agent completed work at " + new Date();
    System.err.println(workStatement);

    try {
      File file = new File(FILE_NAME);
      //noinspection ResultOfMethodCallIgnored
      file.createNewFile();
      FileWriter fileWriter = new FileWriter(FILE_NAME);
      fileWriter.write(workStatement + "\n");
      fileWriter.close();
    } catch (Exception e) {
      System.err.println("Exception occurred " + e.getMessage());
      e.printStackTrace();
    }

    driver.sendFrameworkMessage(workStatement.getBytes());
    status =
        Protos.TaskStatus.newBuilder().setTaskId(task.getTaskId()).setState(Protos.TaskState.TASK_FINISHED).build();
    driver.sendStatusUpdate(status);
  }

  @Override
  public void killTask(ExecutorDriver executorDriver, Protos.TaskID taskID) {
    System.err.println("MyMesosExecutor killTask");
  }

  @Override
  public void frameworkMessage(ExecutorDriver executorDriver, byte[] bytes) {
    System.err.println("MyMesosExecutor frameworkMessage");
  }

  @Override
  public void shutdown(ExecutorDriver executorDriver) {
    System.err.println("MyMesosExecutor shutdown");
  }

  @Override
  public void error(ExecutorDriver executorDriver, String s) {
    System.err.println("MyMesosExecutor error " + s);
  }

  public static void main(String[] args) {
    MesosExecutorDriver driver = new MesosExecutorDriver(new MyMesosExecutor());
    System.exit(driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1);
  }
}

Next we will implement the Scheduler interface. This helps us utilize the incoming offers from the agents and launch the task which we would want. Do remember through this interface the framework is talking to the master.

public class MyMesosScheduler implements Scheduler {
  private Protos.ExecutorInfo executor;
  private int launchedTasks = 0;

  public MyMesosScheduler(Protos.ExecutorInfo executor) {
    this.executor = executor;
  }

  public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID,
      Protos.MasterInfo masterInfo) {
    System.err.println("registered");
  }

  public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) {
    System.err.println("re-registered");
  }

  public void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> list) {
    System.err.println("resourceOffers");

    List<Protos.OfferID> offerIDS = new ArrayList<>();
    List<Protos.TaskInfo> tasks = new ArrayList<Protos.TaskInfo>();

    for (Protos.Offer offer : list) {
      Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();

      for (Protos.Resource resource : offer.getResourcesList()) {
        System.err.println("offer res = " + resource.getName() + " details " + resource.getScalar().getValue());
      }

      Protos.ExecutorInfo executor = getExecutorFromOffer(offer);
      if (executor != null) {

        Protos.Value.Scalar.Builder cpuValue = Protos.Value.Scalar.newBuilder().setValue(1);
        Protos.TaskInfo task = Protos.TaskInfo.newBuilder()
            .setName("Task " + taskId.getValue())
            .setTaskId(taskId)
            .setSlaveId(offer.getSlaveId())
            .addResources(Protos.Resource.newBuilder().setName("cpus").setType(SCALAR).setScalar(cpuValue))
            .setExecutor(Protos.ExecutorInfo.newBuilder(executor))
            .build();
        offerIDS.add(offer.getId());
        tasks.add(task);
      } else {
        // decline offer if you do not have any task to do
        schedulerDriver.declineOffer(offer.getId());
      }
    }
    schedulerDriver.launchTasks(offerIDS, tasks);
  }

  private Protos.ExecutorInfo getExecutorFromOffer(Protos.Offer offer) {
    // TODO: can read from a db, etc to find the matching task
    return executor;
  }

  public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) {
    System.err.println("offerRescinded");
  }

  public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
    System.err.println("statusUpdate " + taskStatus.getMessage());
  }

  public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID,
      byte[] bytes) {
    System.err.println("frameworkMessage");
  }

  public void disconnected(SchedulerDriver schedulerDriver) {
    System.err.println("disconnected");
  }

  public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) {
    System.err.println("slaveLost");
  }

  public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID,
      int i) {
    System.err.println("executorLost");
  }

  public void error(SchedulerDriver schedulerDriver, String s) {
    System.err.println("error " + s);
  }
}

Next we write the code for the Framework and connect the dots. We will create the executor info, the scheduler and pass it to the driver. The driver connects with the master.

public class MesosFramework {
  // TODO: replace this
  private static final String JAR_WITH_DEPENDENCIES =
      "/Users/npurwar/work/example-mesos/target/example-mesos-1.0-SNAPSHOT-jar-with-dependencies.jar";

  public static void main(String[] args) throws InterruptedException {
    CommandInfo.URI uri = CommandInfo.URI.newBuilder().setValue(JAR_WITH_DEPENDENCIES).setExtract(false).build();
    String command = "java -cp " + JAR_WITH_DEPENDENCIES + " com.lms.mesos.executor.MyMesosExecutor";
    CommandInfo commandInfo = CommandInfo.newBuilder().setValue(command).addUris(uri).build();

    ExecutorInfo executorInfo = ExecutorInfo.newBuilder()
        .setExecutorId(Protos.ExecutorID.newBuilder().setValue("MyMesosExecutor"))
        .setCommand(commandInfo)
        .setName("My New Scheduler Executor")
        .setSource("java")
        .build();

    FrameworkInfo.Builder frameworkBuilder =
        FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser("").setName("My New Scheduler Framework");
    frameworkBuilder.setPrincipal("new-scheduler-framework");

    MesosSchedulerDriver driver =
        new MesosSchedulerDriver(new MyMesosScheduler(executorInfo), frameworkBuilder.build(), "127.0.0.1:5050");

    int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;

    System.err.println("Stopping driver in 3 min");
    Thread.sleep(180 * 1000);

    driver.stop();
    System.exit(status);
  }
}

Now we can run this simply by creating the mvn project and executing the below commands.

$ mvn install
$ mvn exec:java

Once we do this, we can go and check at localhost:5050 and see the tasks getting executed. See screenshots below.

You can tail the local file to see the agent is working:

File where the executor is wring to …

References:

http://mesos.apache.org/documentation/latest/architecture/
https://www.baeldung.com/apache-mesos

Leave a comment