4 TODO Items
This section will walk the developer through each of the QuickStart TODO
items. As each TODO
is completed, it is recommended to remove it from the code.
Throughout this chapter, an example Plugin Developer who is part of the “Orchid Organization” and developing a Launcher Plugin for the “Mars” job scheduling system will be used. The theoretical Mars job scheduling provides a C/C++ interface, which will be referred to as the mars_api
. It provides a set of free functions which allow the caller to get details from the Mars job scheduling system. Any mars_api
call may throw a mars_exception
, which needs to be handled appropriately.
4.1 TODO #1: Change Namespaces
TODO Location | Impact |
---|---|
QuickStartMain.cpp | All source files |
The classes in the QuickStart RStudio Launcher Plugin are all in the rstudio::launcher_plugins::quickstart
namespace. As desired, these namespaces may be changed.
4.2 TODO #2: Rename Classes
TODO Location | Impact |
---|---|
QuickStartMain.cpp | All source files |
4.2.1 Example
The developer may now wish to rename all the classes from QuickStart*
to Mars*
. For example, QuickStartMain
would become MarsMain
.
In addition, the developer may also wish to modify the include guards to match their new namespace and class names. For example,
might become
The naming schemes described here are only recommendations. The exact naming scheme is completely at the discretion of the developer.
4.3 TODO #3: Change the Plugin Name
TODO Location | Impact |
---|---|
QuickStartMain.cpp | QuickStartMain.cpp |
The plugin name is used to generate the program ID, which will be used for logging, and the configuration file name. By default, the program ID will be "rstudio-" + getPluginName() + "-launcher"
. The configuration file name will be "launcher." + getPluginName() + ".conf"
and must be located in /etc/rstudio
.
4.4 TODO #4: Optionally Change the Program ID
TODO Location | Impact |
---|---|
QuickStartMain.cpp | QuickStartMain.cpp |
The program ID will be used to uniquely identify logs from the plugin that are sent to the system log. By default, the value will be "rstudio-" + getPluginName() + "-launcher"
, however it is possible to override AbstractMain::getProgramID()
to set a custom program ID.
4.5 TODO #5: Add Options
TODO Location | Impact |
---|---|
QuickStartOptions.cpp | QuickStartOptions.cpp, QuickStartOptions.hpp |
The QuickStartOptions class demonstrates how to add job scheduling specific options to the Plugin. It may not be immediately obvious what options will be required for the Plugin, but when it does become clear that an option is needed it can be added here.
4.5.1 Example
Suppose that the Mars job scheduling system can be controlled using an HTTP API, and it can be set up at a custom URL and port. The Orchid developer might then add a required option for the URL, an optional option for whether SSL should be used, and an optional option for the port. Additionally, the Mars Launcher Plugin will need to be able to make requests (such as listing all of the jobs in the system) as an administrator user. In that case, the developer might modify QuickStartOptions.hpp and QuickStartOptions.cpp in the following way:
QuickStartOptions.hpp
class QuickStartOptions : public Noncopyable
{
public:
// ...
bool getSampleOption() const;
// ...
private:
// ...
bool m_sampleOption;
}
QuickStartOptions.cpp
bool QuickStartOptions::getSampleOption() const
{
return m_sampleOption;
}
void QuickStartOptions::initialize()
{
// TODO #5: Add options, as necessary.
using namespace rstudio::launcher_plugins::options;
Options& options = Options::getInstance();
options.registerOptions()
("sample-option",
Value<bool>(m_sampleOption).setDefaultValue(true),
"sample option to demonstrate how to register options");
}
MarsOptions.hpp
namespace orchid {
namespace mars {
namespace options {
class MarsOptions : public Noncopyable
{
public:
// ...
const std::string& getMarsServiceUser() const;
const std::string& getHost() const;
unsigned int getPort() const;
bool getUseSsl() const;
// ...
private:
// ...
std:string m_marsServiceUser;
std::string m_host;
unsigned int m_port;
bool m_useSsl;
}
} // namespace options
} // namespace mars
} // namespace orchid
MarsOptions.cpp
#include "options/MarsOptions.hpp"
#include <options/Options.hpp>
namespace orchid {
namespace mars {
namespace options {
const std::string& MarsOptions::getHost() const
{
return m_host;
}
unsigned int MarsOptions::getPort() const
{
if (m_port != 0)
return m_port;
if (m_useSsl)
return 443;
return 80;
}
bool MarsOptions::getUseSsl() const
{
return m_useSsl;
}
void MarsOptions::initialize()
{
// TODO #5: Add options, as necessary.
using namespace rstudio::launcher_plugins::options;
Options& options = Options::getInstance();
options.registerOptions()
("mars-service-user",
Value<std::string>(m_host).setDefaultValue("mars"),
"a user which can make service requests to the Mars server")
("host",
Value<std::string>(m_host),
"the IP address or hostname of the Mars server")
("port",
Value<unsigned int>(m_port).setDefaultValue(0),
"the port to use for connecting to the Mars server")
("use-ssl",
Value<bool>(m_useSsl).setDefaultValue(false),
"whether to use HTTPS (true) or HTTP (false, default) when connecting to the Mars server");
}
} // namespace options
} // namespace mars
} // namespace orchid
4.6 TODO #6: Initialize Communication
TODO Location | Impact |
---|---|
QuickStartJobSource.cpp | QuickStartJobSource.cpp |
The next step is to initialize communication with the job scheduling system. This might involve opening a TCP connection to the job scheduling system which will remain open for the duration of the Plugin’s lifetime, or it might involve testing that it is possible to connect to the job scheduling system. In the case of the sample Local Plugin, provided with this SDK, no communication test is necessary since jobs will be run on the local machine.
Any other tasks which need to be completed before the Plugin is ready to launch jobs must be completed here. For example, in the RStudio SLURM Launcher Plugin it was also necessary to check which timezone the SLURM cluster is using, since job timestamps need to be compared later on. If the Plugin implementation will be tied to a specific version, or set of versions, of the job scheduling system, that should be validated here as well.
4.6.1 Example
The Plugin Developer may now change
#include <QuickStartJobSource.hpp>
#include <Error.hpp>
namespace rstudio {
namespace launcher_plugins {
namespace quickstart {
Error QuickStartJobSource::initialize()
{
// TODO #6: Initialize communication with the job scheduling system. If communication fails, return an error.
return Success();
}
} // namespace quickstart
} // namespace launcher_plugins
} // namespace rstudio
in QuickStartJobSource.cpp
to
#include "MarsJobSource.hpp"
#include <Error.hpp>
#include "options/MarsOptions.hpp"
namespace orchid {
namespace mars {
Error MarsJobSource::initialize()
{
const options::MarsOptions& opts = options::MarsOptions::getInstance();
try
{
mars_api::init(opts.host(), opts.port(), opts.useSsl());
}
catch (const mars_api::mars_exception& e)
{
return Error("MarsApiError", e.code(), e.what(), ERROR_LOCATION);
}
return Success();
}
} // namespace mars
} // namespace orchid
4.7 TODO #7: Define Cluster Configuration
TODO Location | Impact |
---|---|
QuickStartJobSource.cpp | QuickStartJobSource.cpp |
RStudio applications which make use of the Launcher can determine the configuration and capabilities of each job scheduling system by submitting a Cluster Info request to the Launcher. Each Plugin is responsible for returning a complete and correct list of the configuration and capabilities of the job scheduling system with which it integrates. For more information about the Cluster Info request, see the RStudio Launcher Plugin SDK Developer Guide.
Some aspects of the configuration and capabilities may be independent of the end user or the Launcher, and others may differ by user. If any configuration or capability values should differ based on the end user, it is likely that these rules should be configurable by the system admin. For the convenience of the Plugin Developer, an AbstractUserProfiles
class has been provided to facilitate the implementation of User Profiles. For more details about implementing this feature, please refer to the ‘User Profiles’ section of the ‘Advanced Features’ chapter of the RStudio Launcher Plugin SDK Developer Guide.
This section will describe the questions which need to be answered to correctly define the configuration and capabilities of a job scheduling system, and what value(s) of the JobSourceConfiguration
structure should be modified in the overridden implementation of IJobSource::getConfigurations
.
Does the job scheduling system support or require containers (such as Docker or Singularity containers)? If yes:
- set
JobSourceConfiguration::ContainerConfig::SupportsContainers
totrue
. Note that ifJobSourceConfiguration::ContainerConfig::SupportsContainers
is set tofalse
, all other values ofJobSourceConfiguration::ContainerConfig
will be ignored. - decide how the list of available container images should be determined and add each container image name to
JobSourceConfiguration::ContainerConfig::ContainerImages
. - decide how the default container image should be determined, if any, and then set the value of
JobSourceConfiguration::ContainerConfig::DefaultImage
. - decide whether unknown images should be allowed and set
JobSourceConfiguration::ContainerConfig::AllowUnknownImages
totrue
orfalse
as desired. > Info: The RStudio Kubernetes Launcher Plugin allows system administrators to control the list of available container images, the default image, and whether unknown images should be allowed on a per-user or per-group basis through the User Profiles feature. For an example of what that configuration looks like, see the samplelauncher.kubernetes.profiles.conf
file in the Job Launcher Plugin Configuration section of the RStudio Job Launcher Administrator’s Guide.
- set
Does the job scheduling system support job queues? A job queue is a subset of machines within the job scheduling system. When a job is submitted to the job scheduling system, a specific queue may be requested and the job should be run on one or more of the machines in the requested queue. In some job scheduling systems, this may also be known as a “partition”. If yes:
- pull down the list of queues from the job scheduling system, or otherwise determine the list available queues, and add each queue to
JobSourceConfiguration::Queues
.
- pull down the list of queues from the job scheduling system, or otherwise determine the list available queues, and add each queue to
Does the job scheduling system support setting resource limits on a job?
- determine the type of resource limits that are supported by the job scheduling system. The SDK supports the following resource types by default: Cpu Count, Cpu Time, Memory, and Memory Swap. More types can be added by setting the
ResourceLimit::ResourceType
field to the name of the desired type. For each type add aResourceLimit
object of that type toJobSourceConfiguration::ResourceLimits
, optionally setting the maximum and default values of that resource limit type.
Info: The existing RStudio Launcher Plugins allow system administrators to configure the maximum and default values for each resource type on a per-user or per-group basis through the User Profiles feature. From an exampleof what that looks like, see a sample
launcher.<plugin name>.profiles.conf
Job Launcher Plugin Configuration section of the RStudio Job Launcher Administrator’s Guide.- determine the type of resource limits that are supported by the job scheduling system. The SDK supports the following resource types by default: Cpu Count, Cpu Time, Memory, and Memory Swap. More types can be added by setting the
Does the job scheduling system support other placement constraints that are important to surface? It is not necessary to surface every placement constraint supported by the job scheduling system; however if any constraints are commonly used or otherwise important to the end user, they may be surfaced through this feature. Examples of such constraints may be other types of resource limits (e.g. disk space, GPU type, Processor Type, etc.), or other types of constraints entirely (e.g. disk type, region of a cloud service, etc.). Placement constraints can either be an enumeration style set of possible values or a free-form text value. If yes:
- for each enumeration-style placement constraint, add one
PlacementConstraint
object toJobSourceConfiguration::PlacementConstraints
for each possible value of that constraint. - for each free-form placement constraint, add one
PlacementConstraint
object toJobSourceConfiguration::PlacementConstraints
with no value.
- for each enumeration-style placement constraint, add one
Important: Whether or not to use the
system::User
parameter provided to each of the capabilities methods is at the discretion of the Plugin Developer. If the Plugin Developer does not wish to support User Profiles and does not have another method to determine per-user or per-group rules, thesystem::User
parameter may simply be ignored.
If there are any job configuration values which cannot be covered by one of the above categories, refer to the ‘Custom Job Source Configuration’ section of the ‘Advanced Features’ chapter of the RStudio Launcher Plugin SDK Developer’s Guide.
4.7.1 Example
Assume that the Mars scheduling system has the following capabilities:
- container support
- a way to list available container images that are available to a given user
- no default container image
- no way to specify unknown images
- a way to set a limit on the number of CPUs, the amount of Memory, and the number of GPUs, and a way to check the maximum possible values that can be requested
- a way to choose between the processor type:
x86
orARM
- optionally, a way to select which host(s) the Mars scheduler can use to run the job
- job queues
The Plugin developer can report this support by changing
Error QuickStartJobSource::getConfiguration(
const system::User& in_user,
api::JobSourceConfiguration& out_configuration) const
{
// TODO #7: Define cluster configuration.
return Success();
}
in QuickStartJobSource.cpp
to
Error MarsJobSource::getConfiguration(
const system::User& in_user,
api::JobSourceConfiguration& out_configuration) const
{
api::ContainerConfiguration& containerConfig = out_configuration.ContainerConfig;
containerConfig.SupportsContainers = true;
int maxGpuCount = 0, maxCpuCount = 0, maxMemoryMB = 0;
try
{
// Get the container images and the queues.
containerConfig.ContainerImages = mars_api::get_images(in_user.getUsername());
out_configuration.Queues = mars_api::get_queues());
// Get the maximum resource limit and placement constraint values.
maxGpuCount = mars_api::get_total_gpus();
maxCpuCount = mars_api::get_total_cpus();
maxMemoryMB = mars_api::get_max_memory();
}
catch (const mars_api::mars_exception& e)
{
return Error("MarsApiError", e.code(), e.what(), ERROR_LOCATION);
}
// Populate the resource limits.
out_configuration.ResourceLimits.emplace_back(
api::ResourceLimit::Type::CPU_COUNT,
std::to_string(maxCpuCount));
out_configuration.ResourceLimits.emplace_back(
api::ResourceLimit::Type::MEMORY,
std::to_string((double)maxMemoryMB));
out_configuration.ResourceLimits.emplace_back(
"GPUs",
std::to_string(maxGpuCount),
"0");
// Populate the placement constraints
out_configuration.PlacementConstraints.emplace_back("Processor Type", "x86");
out_configuration.PlacementConstraints.emplace_back("Processor Type", "ARM");
out_configuration.PlacementConstraints.emplace_back("Processor Type", "Any");
out_configuration.PlacementConstraints.emplace_back("Hostname");
return Success();
}
in MarsJobSource.cpp
.
4.8 TODO #8: Pull All Jobs
TODO Location | Impact |
---|---|
QuickStartJobRepository.cpp | QuickStartJobRepository.cpp |
When the Launcher starts the Plugin, it will send a Bootstrap request to ensure that the Plugin is initialized. During this request, the Plugin SDK will initialize the AbstractJobRepository
which will invoke the concrete implementation of AbstractJobRepostiory::loadJobs
. The Plugin developer needs to override AbstractJobRepostiory::loadJobs
to synchronously pull down the list jobs that are already in the Job Scheduling System. The IJobSource
will be initialized successfully before the Job Repository is initialized.
Any jobs which were not launched by the Plugin should not be included in the output. To distinguish between jobs that were launched by the Plugin and jobs that were launched either manually or by another tool, it may be useful to preface the name of the job with a unique tag (e.g. '[RStudio Launcher] <job name>
). Alternately, if the Job Scheduling System supports setting custom fields on the job, a custom field may be used to indicate that the job was launched through the Plugin. Note that if a unique prefix is added to Launcher jobs by the Plugin, it should be removed from the job name before being returned to the Launcher.
Each job detail must be converted from the format in the Job Scheduling System to the format understood by the Launcher. For example, if dates are represented as a string in the Job Scheduling System, they must be parsed as a system::DateTime
before the associated Job
object can be updated. For more information about parsing dates from strings, see the ‘Date-Time Support’ section of the ‘Advanced Features’ chapter of the RStudio Launcher Plugin SDK Developer’s Guide.
4.8.1 Job Fields
The Job
object has a field for each detail that the Launcher requires. This document will discuss some of the fields which require special consideration. Because jobs may remain in the system between Plugin restarts, the Plugin must be able to populate each field of the Job
object exactly as it would have been when it was submitted to the Plugin based on the record of the job in the Job Scheduling System. If any fields cannot be retrieved from the Job Scheduling System, it will most likely be easier to ignore them during this TODO and design a solution during TODO #12. For more information about each field, see the RStudio Launcher Plugin SDK API Reference.
4.8.1.1 Command vs. Exe vs. Container
Depending on whether a Plugin supports containers, there are two or three possible ways a job may be submitted to the Plugin. Without container support, a job may be submitted wither either the Job::Command
or the Job::Exe
field set. The difference is that a job submitted with the Job::Command
field populated should be executed as a shell command (e.g. /bin/sh -c <Command>
), and a job submitted with the Job::Exe
field populated should be executed directly. With container support, the job may be submitted with the Command
and Container
fields set, with the Exe
and Container
fields set, or just with the Container
field set. In that case, the Container
field describes the Container
that should run either the shell command or the executable, or if no command or executable was set, the Container
itself is the job to be run. Job submission is discussed in more detail in TODO #12.
The Plugin must be able to determine from the Job Scheduling System’s record of the job which of the Job::Command
, Job::Exe
, and Job::Container
fields should be populated, and must be able to repopulate those fields exactly as they were when they were submitted to the Plugin.
4.8.1.2 Job Status
Another special consideration is how to convert from the Job Scheduling System’s job status to the RStudio Launcher Plugin SDK job status. To help with the conversion, below is a table which describes all of the Job::State
enumeration values and their meaning with respect to the state of the job.
Job::State Value |
Meaning |
---|---|
CANCELED |
The job was canceled by the user before it transitioned to the RUNNING state. |
FAILED |
The job could not be launched by the Job Scheduling System. This does not include a job that runs and exits with a non-zero exit code. |
FINISHED |
The job was launched by the Job Scheduling System and has exited. This includes jobs which exit with a non-zero exit code. |
KILLED |
The job was forcibly stopped, using SIGKILL . |
PENDING |
The job has been submitted to the Job Scheduling System but has not started running yet. |
RUNNING |
The job is currently running. |
SUSPENDED |
The job has been paused by the user, and may be resumed later. |
UNKNOWN |
This is not a job status - it represents the default value of the job state, before it has been submitted to the Plugin. Jobs populated by the Plugin should not have this status. |
4.8.2 Example
Assume the following:
- The Mars Job Scheduling System API has a
list_jobs
function which takes a username. It retrieves the jobs the specified user has permission to see. - The Orchid organization’s developer has added a
marsJobToJob
helper function which converts amars_api::job
structure to anrstudio::launcher_plugins::api::Job
structure and has the signaturerstudio::launcher_plugins::api::JobPtr marsJobToJob(const mars_api::job& in_marsJob)
. - The Plugin developer plans to append
"[RStudio Launcher]"
to the Job name on submission.
In that case, the Plugin developer might change
Error QuickStartJobRepository::loadJobs(api::JobList& out_jobs) const
{
// TODO #8: Pull all RStudio jobs from the job scheduling system synchronously.
return Success();
}
in QuickStartJobRepository.cpp
to
Error MarsJobRepository::loadJobs(api::JobList &out_jobs) const
{
const options::MarsOptions& opts = options::MarsOptions::getInstance();
std::vector<mars_api::job> marsJobs;
try
{
marsJobs = mars_api::list_jobs(opts.getMarsServiceUser());
}
catch (const mars_api::mars_exception& e)
{
return Error("MarsApiError", e.code(), e.what(), ERROR_LOCATION);
}
for (const mars_api::job& job: marsJobs)
if (job._name.find("[RStudio Launcher]") != std::string::npos)
out_jobs.push_back(marsJobTojob(job));
return Success();
}
in MarsJobRepository.cpp
.
4.9 TODO #9: Poll Job Statuses
TODO Location | Impact |
---|---|
QuickStartJobStatusWatcher.cpp | QuickStartJobStatusWatcher.cpp |
Along with the next two TODOs (#10 and #11), this TODO will walk through how to set up a Job Status Watcher that polls the Job Scheduling System for Job status updates. If it is possible to stream Job status updates from the Job Scheduling System, follow the instructions in the ‘Streaming’ subsection of the ‘Job Status Updates’ section of the ‘Advanced Features’ chapter of the RStudio Launcher Plugin SDK Developer Guide and then skip to TODO #12.
Note that using a Job Status Watcher is optional. If neither polling nor streaming Job status updates is suitable, the Plugin developer may choose to implement the updating of Job statuses however best suits the Job Scheduling System’s available functionality. More details can be found in the ‘Job Status Updates’ section of the ‘Advanced Features’ chapter of the RStudio Launcher Plugin SDK Developer Guide.
The AbstractTimedJobStatusWatcher
base class will invoke pollJobStatus
once every configurable amount of time. When this method is invoked, the Plugin should poll the Job Scheduling System for all the current Job statuses. The SDK will handle verifying whether a status is new, as long as correct information is passed to the AbstractJobStatusWatcher::updateJobStatus
method. The necessary information is as follows:
- The ID of the Job.
- The current status of the Job.
- The current status message of the Job, if any.
- The current time, or the time at which the Job was last modified, if available.
If clock skew between the Job Scheduling System and the Plugin is a concern, or if the Job Scheduling System provides the last modification time of the Job, it is advisable to provide the last modified time to the in_invocationTime
parameter of updateJobStatus
.
The pollJobStatus
should return an error if and only if it hits an unrecoverable error, such as failing to communicate with the Job Scheduling System. If there is an issue handling a single Job’s status update, it may be sufficient to log an error and try again the next time that pollJobStatus
is invoked.
The Launcher supports load balancing, which means that it is possible that jobs can be submitted to the Job Scheduling System by another instance of the Plugin. Each instance of the Plugin should be able to correctly report all Jobs that were submitted to the Job Scheduling System through the Launcher. Usually, the most straightforward way to implement this is to pick up the new Jobs while polling or streaming Job status updates. If AbstractJobStatusWatcher::updateJobStatus
is invoked for a Job that is not currently in the Job Repository, it will invoke AbstractJobStatusWatcher::getJobDetails
. For more details about implementing AbstractJobStatusWatcher::getJobDetails
, see TODO #10.
4.9.1 Example
In addition to the assumptions from previous examples, assume the following:
- The
mars_api::job
structure has alast_modified
field which is a string in the formatYYYY-MM-DDTHH:mm:ss.sssss[+-]HH:mm
. - The
mars_api::job
structure has astatus
field which is a string, and the Plugin developer has added amarsStatusToStatus
method which converts the Mars job status to it’s applicable SDK status equivalent. - The
mars_api::job
structure has areason
field which is a string describing the reason for the current status. - The
mars_api::job
structure has anid
field which is an unsigned integer, that the Plugin developer has mapped to an SDKJob::Id
field by converting the integer value to a string.
In that case, the plugin developer might change
Error QuickStartJobStatusWatcher::pollJobStatus()
{
// TODO #9: Poll the Job Scheduling System for job status updates. Invoke AbstractJobStatusWatcher::updateJobStatus
// for each updated job.
return Success();
}
in QuickStartJobStatusWatcher.cpp
to
Error MarsJobStatusWatcher::pollJobStatus()
{
const options::MarsOptions& opts = options::MarsOptions::getInstance();
std::vector<mars_api::job> marsJobs;
try
{
marsJobs = mars_api::list_jobs(opts.getMarsServiceUser());
}
catch (const mars_api::mars_exception& e)
{
return Error("MarsApiError", e.code(), e.what(), ERROR_LOCATION);
}
for (const mars_api::job& job: marsJobs)
{
if (job._name.find("[RStudio Launcher]") == std::string::npos)
continue; // Skip non-Launcher jobs.
system::DateTime lastModified;
Error error = system::DateTime::fromString(job.last_modified, lastModified);
if (error)
{
// Use the current time as the invocation time instead, but log an error.
logging::logError(error, ERROR_LOCATION);
error = updateJobStatus(
std::to_string(job.id),
marsStatusToStatus(job.status),
job.reason);
}
else
{
error = updateJobStatus(
std::to_string(job.id),
marsStatusToStatus(job.status),
job.reason,
lastModified);
}
if (error)
{
logging::logErrorMessage("Failed to update job " + std::to_string(job.id), ERROR_LOCATION);
logging::logError(error, ERROR_LOCATION);
}
}
return Success();
}
in MarsJobStatusWatcher.cpp
.
4.10 TODO #10: Get the Missing Job Details
TODO Location | Impact |
---|---|
QuickStartJobStatusWatcher.cpp | QuickStartJobStatusWatcher.cpp |
If AbstractJobStatusWatcher::updateJobStatus
is invoked for a Job which is not in the Job Repository, the SDK will call AbstractJobStatusWatcher::getJobDetails
to populate a Job
object with all of the details of the Job, and then add it to the Job Repository. Since AbstractJobStatusWatcher::getJobDetails
should get the full details of the Job, just like IJobSource::getJobs()
does, it may be advisable to make a shared implementation for getting and/or parsing Jobs from the Job Scheduling System.
This method should return an error if it is unable to populate the Job details.
4.10.1 Example
In addition to the assumptions from previous examples, assume the following:
- The
mars_api
has alist_job
function which lists a specific Job by ID.
In that case, the plugin developer might change
Error QuickStartJobStatusWatcher::getJobDetails(const std::string& in_jobId, api::JobPtr& out_job) const
{
// TODO #10: Get the full details of the requested job from the Job Scheduling System, and remove the placeholder
// error below.
return Error(
"NotImplemented",
1,
"Method QuickStartJobStatusWatcher::getJobDetails is not implemented.", ERROR_LOCATION);
}
in QuickStartJobStatusWatcher.cpp
to
Error MarsJobStatusWatcher::getJobDetails(const std::string& in_jobId, api::JobPtr& out_job) const
{
const options::MarsOptions& opts = options::MarsOptions::getInstance();
mars_api::job marsJob;
try
{
unsigned long id = std::strtoul();
marsJob = mars_api::list_job(opts.getMarsServiceUser(), id);
}
catch (const std::invalid_argument& e)
{
return Error("InternalError", 1, e.what(), ERROR_LOCATION);
}
catch (const std::out_of_range& e)
{
return Error("InternalError", 2, e.what(), ERROR_LOCATION);
}
catch (const mars_api::mars_exception& e)
{
return Error("MarsApiError", e.code(), e.what(), ERROR_LOCATION);
}
// This should only be invoked for Launcher Jobs because of the filtering in pollJobStatus,
// so return an error if somehow it's not a Launcher Job.
if (job._name.find("[RStudio Launcher]") == std::string::npos)
return Error("JobNotFound", 1, "Job " + in_jobId + " is not an RStudio Launcher Job.", ERROR_LOCATION);
out_job = marsJobToJob(marsJob);
return Success();
}
4.11 TODO #11: Adjust the Polling Frequency
TODO Location | Impact |
---|---|
QuickStartJobSource.cpp | QuickStartJobSource.cpp |
The QuickStartJobStatusWatcher
doesn’t actually update any Job statuses, as there is no QuickStart Job Scheduling System to pull jobs from. To reduce the amount of busy-work the QuickStart Plugin will do when run, the polling frequency of the Job Status Watcher is set to 1 minute. In reality, Job statuses should be kept much more up to date, on the order of seconds rather than minutes.
The Plugin developer may choose a fixed interval, such as 5 seconds, or allow the RStudio Workbench administrator to decide the number of seconds using a configuration option.
4.11.1 Example
The Orchid Organization’s Plugin developer may choose to change
QuickStartJobSource::QuickStartJobSource(
const jobs::JobRepositoryPtr& in_jobRepository,
const jobs::JobStatusNotifierPtr& in_jobStatusNotifier) :
api::IJobSource(in_jobRepository, in_jobStatusNotifier),
m_jobStatusWatcher(
new QuickStartJobStatusWatcher(
system::TimeDuration::Minutes(1),
in_jobRepository,
in_jobStatusNotifier))
{
// TODO #11: Adjust the job status watcher frequency.
}
in QuickStartJobSource.cpp
to
MarsJobSource::MarsJobSource(
const jobs::JobRepositoryPtr& in_jobRepository,
const jobs::JobStatusNotifierPtr& in_jobStatusNotifier) :
api::IJobSource(in_jobRepository, in_jobStatusNotifier),
m_jobStatusWatcher(
new MarsJobStatusWatcher(
system::TimeDuration::Seconds(5),
in_jobRepository,
in_jobStatusNotifier))
{
}
in MarsJobSource.cpp
.
4.12 TODO #12: Submit a Job
TODO Location | Impact |
---|---|
QuickStartJobSource.cpp | QuickStartJobSource.cpp |
When a Job is submitted to the Plugin, the Plugin will be provided with an Job
object which represents the details of the job. The Plugin must pass the necessary details of the Job
object to the Job Scheduling System. In addition, the Plugin is responsible for making the following updates to the Job
object:
- Set the ID of the Job. If the Job Scheduling System does not generate an ID, the Plugin must generate a unique ID for the Job and correctly associate it with the same Job in the Job Scheduling System.
- Set the
SubmissionTime
field of theJob
object. This may be either the time that the Plugin invoked the Job Scheduling System’s submission method or the time that the Job Scheduling System reports that the Job was submitted, if available.
The SDK will set the initial Job Status to Job::State::PENDING
if no Error
is returned from IJobSource::submitJob
. Additional Job Status updates must be implemented by the Plugin. Ideally, this would be implemented through Job Status Streaming, which is discussed in detail in the ‘Job Status Updates’ section of the ‘Advanced Features’ chapter of the RStudio Launcher Plugin SDK Developer’s Guide.
When a Job is updated in the JobStatusNotifier
for the first time, it gets added to the AbstractJobRepository
. If the Plugin requires any special handling for newly added Jobs, or for Jobs that are being pruned, this functionality can be added in the concrete implementation of the AbstractJobRepositry
. For more detail see the ‘Customizing the Job Repository’ section of the ‘Advanced Features’ chapter of the RStudio Launcher Plugin SDK Developer’s Guide.
4.12.1 Error Handling
The Plugin should also validate the Job
object before submitting the Job to the Job Scheduling System. For example, the Job Scheduling System may only support the NFS mount type. If there are any Host mounts set on the Job
it will not be rejected by the SDK layer, as the SDK has no knowledge of what mount types are supported by the Job Scheduling System. In that case, the Plugin should set the out_wasInvalidRequest
parameter to true
, since the user requested an unsupported mount type, and return an Error
object. The Error
’s category name, error code, and message will be returned to the Launcher to be sent back to the caller.
If the Job object was valid but it fails to launch for another reason the Plugin should not set out_wasInvalidRequest
. The SDK layer sets the initial value of the variable to false
.
4.12.2 Example
Recall from Example #7 that the Mars scheduling system supports containers, queues, resource limits (CPU, Memory, and GPU), a way to select the processor type (x86
or ARM
), and a way to select which specific host(s) may run the job. Assume that the Mars job scheduling system also supports mounting an NFS path on job start-up. Finally, assume that the mars_api
exposes a function with the following signature:
mars_api::job submit_job(
const char* exe,
const char** args,
size_t argc,
const char* stdin,
mars_api::job_opts opts);
Where mars_api::job_opts
is defined as follows:
enum proc_type
{
PROC_TYPE_X86,
PROC_TYPE_ARM,
PROC_TYPE_ANY
};
struct mount
{
std::string source;
std::string dest;
std::string host;
bool read_only;
};
struct job_opts
{
job_opts() :
max_gpus(0),
min_gpus(0),
max_cpus(0),
min_cpus(0),
max_mem_mb(0),
min_mem_mb(0),
processor_type(PROC_TYPE_ANY)
{}
std::string name;
std::string comment;
std::string stdout_file;
std::string stderr_file;
std::vector<std::string> queues;
std::vector<std::string> hosts;
std::vector<mount> mounts;
int max_gpus;
int min_gpus;
int max_cpus;
int min_cpus;
int max_mem_mb;
int min_mem_mb;
proc_type processor_type;
};
The Orchid organization’s Plugin developer knows from the Mars API documentation that a resource limit with a value of 0
means that there is no limit. The Plugin developer also knows that the comment field can be any UTF-8 text and does not have a maximum length. Then the developer might change
Error QuickStartJobSource::submitJob(api::JobPtr io_job, bool& out_wasInvalidRequest) const
{
// TODO #12: Submit and then update the job.
out_wasInvalidRequest = true;
return Error(
"NotImplemented",
1,
"Method QuickStartJobSource::submitJob is not implemented.",
ERROR_LOCATION);
}
in QuickStartJobSource.cpp
to
void replaceSpecialChars(std::string& io_str)
{
// Replace % first so that added %s won't also be replaced.
std::regex_replace(io_str, std::regex("%"), "%p");
std::regex_replace(io_str, std::regex("="), "%e");
std::regex_replace(io_str, std::regex(","), "%c");
std::regex_replace(io_str, std::regex(";"), "%s");
std::regex_replace(io_str, std::regex("\n"), "%n");
std::regex_replace(io_str, std::regex("\r"), "%r");
std::regex_replace(io_str, std::regex("|"), "%b");
}
Error MarsJobSource::submitJob(api::JobPtr io_job, bool& out_wasInvalidRequest) const
{
// Create the job_opts
mars_api::job_opts opts;
opts.name = io_job->Name;
std::string stdInp = "#!/bin/sh\n"
// Convert the mounts. If any are not NFS mounts, exit with an error.
for (const auto& mount: io_job->Mounts)
{
if (!mount.NfsMountSource)
{
out_wasRequestInvalid = true;
return Error("MarsPluginError", 1, "Mars Launcher Plugin only supports NFS Mounts.", ERROR_LOCATION);
}
// Because of the check above, .getValueOr is guaranteed to return a non-empty mount source.
NfsMountSource mountSrc = mount.NfsSourcePath.getValueOr(NfsMountSource());
mars_api::mount marsMount;
marsMount.dest = mount.DestinationPath;
marsMount.read_only = mount.IsReadOnly;
marsMount.source = mountSrc.Path;
marsMount.host = mountSrc.Host;
opts.mounts.push_back(mount);
}
// Set up the resource limits. There shouldn't be any unsupported types, but
// if there are, exit with an error.
for (const auto& limit: io_job->ResourceLimits)
{
// For each resource type, request exactly the amount set by the user.
if (limit.ResourceType == ResourceLimit::Type::CPU_COUNT)
{
opts.min_cpus = std::stoi(limit.Value);
opts.max_cpus = opts.min_cpus;
}
else if (limit.ResourceType == ResourceLimit::Type::MEMORY)
{
opts.min_mem_mb = std::stoi(limit.Value);
opts.max_mem_mb = opts.min_mem_mb;
}
else if (limit.ResourceType == "GPUs")
{
opts.min_gpus = std::stoi(limit.Value);
opts.max_gpus = opts.min_gpus;
}
else
{
out_wasRequestInvalid = true;
return Error(
"MarsPluginError",
2,
"Mars Launcher Plugin does not support the requested ResourceLimit type: " + limit.ResourceType,
ERROR_LOCATION);
}
}
// Copy queues.
std::copy(io_job->Queues.begin(), io_job->Queues.end(), std::back_inserter(opts.queues));
// Save the environment and the tags in the job comment.
std::string tagStr;
for (std::string tag: io_job->Tags)
{
if (!tagStr.empty())
tagStr.append(",");
replaceSpecialChars(tag);
tagStr.append(tag);
}
if (!tagStr.empty())
opts.comment.append("tags=").append(tagStr).append(";");
// Also set up the export lines for the shell script that will be passed to
// stdin.
std::string envStr;
for (const auto& env: io_job->Environment)
{
if (!envStr.empty())
envStr.append(",");
std::string key = env.first;
std::string val = env.second;
stdInp.append("export ").append(key).append("=").append(val).append("\n");
replaceSpecialChars(key);
replaceSpecialChars(val);
envStr.append(key).append("|").append(val);
}
if (!envStr.empty())
opts.comment.append("env=").append(envStr).append(";");
// Get the hosts
std::string hostsStr = io_job->getJobConfigValue("Hostname").getValueOr("");
std::sregex_token_iterator itr(hostStr.begin(), hostStr.end(), std::regex(","), -1);
std::copy(itr, std::sregex_token_iterator(), std::back_inserter(opts.hosts));
// Get the processor type.
std::string procType = io_job->getJobConfigValue("Processor Type").getValueOr("");
if (procType == "x86")
opts.processor_type = PROC_TYPE_X86;
else if (procType == "ARM")
opts.processor_type = PROC_TYPE_ARM;
// Set the output files.
opts.stdout_file = io_job->StandardOutFile;
opts.stderr_file = io_job->StandardErrFile;
// Add the command/exe and arguments to the script. Jobs with both or no
// command/exe will be rejected by the SDK layer.
if (!io_job->Command.empty())
stdInp.append(system::process::shellEscape(io_job->Command));
else
stdInp.append(system::process::shellEscape(io_job->Exe));
for (const std::string& arg: io_job->Arguments)
stdInp.append(" ").append(system::process::shellEscape(arg));
// Now add the real stdin, if any. Escape the here-document start with " to
// ensure the stdin is interpreted literally.
if (!io_job->StandardIn.empty())
stdInp.append("<< \"EOF\"\n").append(io_job->StandardIn).append("\nEOF\n");
LOCK_JOB(io_job)
{
// Now submit the job!
io_job->SubmissionTime = system::DateTime(); // Set the submission time to now
mars_api::job marsJob;
try
{
marsJob = mars_api::submit_job("/bin/sh", { }, 0, stdInp.c_str(), opts);
}
catch(mars_api::mars_exception& e)
{
return Error("MarsApiError", e.code(), e.what(), ERROR_LOCATION);
}
// Success! Update the ID and status and then exit.
// Set the job ID
io_job->Id = marsJob.id;
io_job->Status = marsStatusToStatus(marsJob.state);
}
END_LOCK_JOB
return Success();
}
in MarsJobSource.cpp
.
Notice that the Mars API did not provide a way to set environment variables for the job. To compensate, the Plugin developer had to wrap the requested command in a here-document which set the requested environment variables.
4.13 TODO #13: Create an Output Stream
TODO Location | Impact |
---|---|
QuickStartJobSource.cpp | QuickStartJobSource.cpp |
When a request is made to stream job output, the SDK will retrieve the relevant Job
object, validate the authorization of the user to access the specified Job, and then invoke IJobSource::createOutputStream
with the appropriate parameters. The Plugin is then responsible for creating an AbstractOuptutStream
object which will stream the correct type of job output. If the job output will always be written to a file on a shared location, the Plugin may simply construct a FileOutputStream
object and return it to the SDK.
If there is need to skip some lines of output or in some way further process the Job output, but it can still be read from a file, it is possible to customize the behavior of the base FileOutputStream
class by inheriting from it. For more details, please refer to the ‘Customizing the File Output Stream’ subsection of the ‘Advanced Features’ chapter of the RStudio Launcher Plugin SDK Developer Guide.
If it is not possible to stream the output from a file, please refer to the ‘Custom Output Streams’ section of the ‘Advanced Features’ chapter of the RStudio Launcher Plugin SDK Developer Guide for details on how to create a completely custom output stream class.
4.13.1 Example
Suppose that the Mars Job Scheduling System job output is being written to a file and that the Mars Launcher Plugin will have a requirement that job output be written to a shared location. In that case the Plugin developer may decide to create a FileOutputStream
object to stream the job output. They may change
Error QuickStartJobSource::createOutputStream(
api::OutputType in_outputType,
api::JobPtr in_job,
api::AbstractOutputStream::OnOutput in_onOutput,
api::AbstractOutputStream::OnComplete in_onComplete,
api::AbstractOutputStream::OnError in_onError,
api::OutputStreamPtr& out_outputStream)
{
// TODO #13: Create an output stream.
return Error(
"NotImplemented",
2,
"Method QuickStartJobSource::createOutputStream is not implemented.",
ERROR_LOCATION);
}
in QuickStartJobSource.cpp
to
Error Mars::createOutputStream(
api::OutputType in_outputType,
api::JobPtr in_job,
api::AbstractOutputStream::OnOutput in_onOutput,
api::AbstractOutputStream::OnComplete in_onComplete,
api::AbstractOutputStream::OnError in_onError,
api::OutputStreamPtr& out_outputStream)
{
out_outputStream.reset(new api::FileOutputStream(in_outputType, in_job, in_onOutput, in_onComplete, in_onError));
return Success();
}
in MarsJobSource.cpp
.
4.14 TODO #14: Get Network Information
TODO Location | Impact |
---|---|
QuickStartJobSource.cpp | QuickStartJobSource.cpp |
Some use cases of the Launcher may require the ability to communicate with a remotely launched job. For example, when using the Launcher to run RStudio R Sessions, the Rstudio Workbench needs to be able to communicate with the R Session over TCP. To facilitate this, the Plugin must be able to provide network information for each Job.
The Plugin developer should implement IJobSource::getNetworkInfo
so that it populates the out_networkInfo
object with the hostname and IP addresses of the host running the specified Job.
Note that the in_job
object provided to the IJobSource::getNetworkInfo
should already have the Host
field populated because a Job must have been successfully submitted to the Job Scheduling System in order for a network info request to be valid.
4.14.1 Example
Assume that the Plugin developer set the Host
field on the Job object to the name of the Mars node running the Job, and that the name of the Mars node may or may not be the actual hostname of the machine running the job. Also assume that there is a mars_api
method that returns information about a node with the following signature:
struct node
{
std::string hostname;
std::vector<std::string> addresses;
int avail_cpus;
int avail_mem_mb;
int avail_gpus;
proc_type processor_type;
}
mars_api::node node_info(const char* node_name);
In that case, the developer might change
Error QuickStartJobSource::getNetworkInfo(api::JobPtr in_job, api::NetworkInfo& out_networkInfo) const
{
// TODO #14: Get the network information of the specified job.
return Success();
}
in QuickStartJobSource.cpp
to
Error MarsJobSource::getNetworkInfo(api::JobPtr in_job, api::NetworkInfo& out_networkInfo) const
{
mars_api::node nodeInfo;
try
{
nodeInfo = mars_api::node_info(in_job->Host.c_str());
}
catch(mars_api::mars_exception& e)
{
return Error("MarsApiError", e.code(), e.what(), ERROR_LOCATION);
}
out_networkInfo.Hostname = nodeInfo.hostname;
std::copy(nodeInfo.addresses.begin(), nodeInfo.addresses.end(), std::back_inserter(out_networkInfo.IpAddresses));
return Success();
}
in MarsJobSource.cpp
4.15 TODO #15: Control Jobs
TODO Location | Impact |
---|---|
QuickStartJobSource.cpp | QuickStartJobSource.cpp |
When a request is made to control the state of a Job, the SDK will first validate the state of the job and then invoke the appropriate method on IJobSource
. If the Job is not in an appropriate state for the control operation that was requested, the SDK will return an error to the Launcher without invoking any methods on the IJobSource
object.
The following table describes the possible control job operations, the method that will be invoked for each, the equivalent POSIX signal, and the list of states that a Job can be in for that operation to be valid.
Operation | IJobSource method |
Equivalent Posix Signal | Valid Job States |
---|---|---|---|
Cancel | cancelJob |
N/A | PENDING |
Kill | killJob |
SIGKILL |
RUNNING |
Resume | resumeJob |
SIGCONT |
SUSPENDED |
Stop | stopJob |
SIGTERM |
RUNNING |
Suspend | suspendJob |
SIGSTOP |
RUNNING |
The Plugin is not required to support any of these operations. For any of the methods that the Plugin does not support, the Plugin should return false
. If the Plugin returns false
for one of these methods, the SDK will provide a default "Operation not supported"
error message. However, the Plugin developer may choose to provide a more detailed error message by setting the out_statusMessage
variable.
If the Plugin does support a given operation, the Plugin should return true
from the method. The Plugin can indicate the success of the operation via the out_isComplete
parameter by setting it to true
on a successful operation and false
on a failed operation. In any case, the Plugin may provide more details about the success or failure of the operation by setting the out_statusMessage
variable.
Note that the SDK will acquire the JobLock
of the relevant Job
object before invoking the appropriate IJobSource
method, so the Plugin implementation is free to modify the state of the job without acquiring the JobLock
itself. The Plugin should modify the state of the Job
when the operation is finished. In some cases, that may mean immediately modifying the state of the Job after the operation is invoked. In some cases, that may mean updating the state of the Job when the change is reflected in the Job Scheduling System.
4.15.1 Example
Assume that the mars_api
provides a way to cancel a job via the function int cancel_job(const char* id)
, a way to suspend a running job via the function int suspend_job(const char* id)
, a way to resume a suspended job via int resume_job(const char* id)
, and a way to stop a job via int stop_job(const char* id, bool force = false)
. In that case, the developer might change
bool QuickStartJobSource::cancelJob(api::JobPtr in_job, bool& out_isComplete, std::string& out_statusMessage)
{
// TODO #15: Cancel a pending job.
out_isComplete = false;
out_statusMessage = "Cancel job is not supported.";
return false;
}
// ...
bool QuickStartJobSource::killJob(api::JobPtr in_job, bool& out_isComplete, std::string& out_statusMessage)
{
// TODO #15: Kill a running job.
out_isComplete = false;
out_statusMessage = "Kill job is not supported.";
return false;
}
bool QuickStartJobSource::resumeJob(api::JobPtr in_job, bool& out_isComplete, std::string& out_statusMessage)
{
// TODO #15: Resume a suspended job.
out_isComplete = false;
out_statusMessage = "Resume job is not supported.";
return false;
}
bool QuickStartJobSource::stopJob(api::JobPtr in_job, bool& out_isComplete, std::string& out_statusMessage)
{
// TODO #15: Stop a running job.
out_isComplete = false;
out_statusMessage = "Stop job is not supported.";
return false;
}
bool QuickStartJobSource::suspendJob(api::JobPtr in_job, bool& out_isComplete, std::string& out_statusMessage)
{
// TODO #15: Suspend a running job.
out_isComplete = false;
out_statusMessage = "Suspend job is not supported.";
return false;
}
in QuickStartJobSource.cpp
to
bool MarsJobSource::cancelJob(api::JobPtr in_job, bool& out_isComplete, std::string& out_statusMessage)
{
int ret = -1;
try
{
ret = mars_api::cancel_job(in_job->Id);
out_isComplete = (ret == 0);
// The job status watcher will update the job status to CANCELED when it shows as canceled in the Mars system.
if (!out_isComplete)
out_statusMessage = "Cancel job " + in_job->Id + " failed with code " + std::to_string(ret);
}
catch (const mars_api::mars_exception& e)
{
out_isComplete = false;
out_statusMessage = "Cancel job " + in_job->Id + " failed " + e.what();
}
return true;
}
// ...
bool MarsJobSource::killJob(api::JobPtr in_job, bool& out_isComplete, std::string& out_statusMessage)
{
int ret = -1;
try
{
// Stop the job forcefully.
ret = mars_api::stop_job(in_job->Id, true);
out_isComplete = (ret == 0);
// Set the job status to KILLED so that it doesn't incorrectly show as FINISHED
if (out_isComplete)
in_job->Status = api::Job::State::KILLED;
else
out_statusMessage = "Kill job " + in_job->Id + " failed with code " + std::to_string(ret);
}
catch (const mars_api::mars_exception& e)
{
out_isComplete = false;
out_statusMessage = "Kill job " + in_job->Id + " failed " + e.what();
}
return true;
}
bool MarsJobSource::resumeJob(api::JobPtr in_job, bool& out_isComplete, std::string& out_statusMessage)
{
int ret = -1;
try
{
ret = mars_api::resume_job(in_job->Id);
out_isComplete = (ret == 0);
// The job status watcher will update the job status to RUNNING when it shows as canceled in the Mars system.
if (!out_isComplete)
out_statusMessage = "Resume job " + in_job->Id + " failed with code " + std::to_string(ret);
}
catch (const mars_api::mars_exception& e)
{
out_isComplete = false;
out_statusMessage = "Resume job " + in_job->Id + " failed " + e.what();
}
return true;
}
bool MarsJobSource::stopJob(api::JobPtr in_job, bool& out_isComplete, std::string& out_statusMessage)
{
int ret = -1;
try
{
ret = mars_api::stop_job(in_job->Id);
out_isComplete = (ret == 0);
// The job status watcher will update the job status to FINISHED when it shows as completed in the Mars system.
if (!out_isComplete)
out_statusMessage = "Stop job " + in_job->Id + " failed with code " + std::to_string(ret);
}
catch (const mars_api::mars_exception& e)
{
out_isComplete = false;
out_statusMessage = "Stop job " + in_job->Id + " failed " + e.what();
}
return true;
}
bool MarsJobSource::suspendJob(api::JobPtr in_job, bool& out_isComplete, std::string& out_statusMessage)
{
int ret = -1;
try
{
ret = mars_api::suspend_job(in_job->Id);
out_isComplete = (ret == 0);
// The job status watcher will update the job status to SUSPENDED when it shows as suspended in the Mars system.
if (!out_isComplete)
out_statusMessage = "Suspend job " + in_job->Id + " failed with code " + std::to_string(ret);
}
catch (const mars_api::mars_exception& e)
{
out_isComplete = false;
out_statusMessage = "Suspend job " + in_job->Id + " failed " + e.what();
}
return true;
}
in MarsJobSource.cpp
.
4.16 TODO #16: Create a Resource Utilization Stream
TODO Location | Impact |
---|---|
QuickStartResourceStream.cpp | QuickStartResourceStream.cpp, QuickStartResourceStream.hpp |
When a request is made to stream resource utilization metrics for a Job, the SDK will retrieve the relevant Job
object, validate the authorization of the user to access the specified Job, and then invoke IJobSource::createResourceStream
with the appropriate parameters. The Plugin is then responsible for creating an AbstractResourceStream
object which will stream any resource utilization metrics that it is able to access. Supported resources are CPU Percentage, elapsed CPU Time in seconds, Resident Set size (or Physical Memory size) in MB, and Virtual Memory size in MB.
Streaming resource utilization metrics is a best effort operation. If any of the resource utilization metrics cannot be retrieved through the Job Scheduling System, the Plugin may simply omit them from each response. If it is not possible to retrieve any resource utilization metrics through the Job Scheduling System, the Plugin may simply not post any updates. When the Job enters a completed state, the SDK will send a response to the Launcher to indicate that the stream has completed. The provided QuickStartResourceStream
class already fulfills that requirement for a Job Scheduling System that does not support resource utilization measurement.
The provided sample Local Launcher Plugin implements resource utilization by inheriting from AbstractTimedResourceStream
because the data must be polled from the Operating System periodically. The example below will show how the AbstractResourceStream
might be inherited when it is possible to stream the necessary information from the Job Scheduling System.
4.16.1 Example
Assume that the mars_api
exposes a function with the following signature:
struct stream_handle;
std::shared_ptr<stream_handle> job_stat_stream(
const char* id,
int freq,
std::function<void(int, const mars_api::job_stat*)> on_update);
Where id
is the ID of the job for which to stream job resource statistics, freq
is the frequency (in seconds) at which updates should be posted, and on_update
is the function that will be called every freq
seconds. The return value of mars_api::job_stat_stream
is an opaque stream handle that is owned by the caller. The stream will stop if the handle is destroyed.
The first parameter of the on_update
function is an error code (0
on success) and the second parameter is the Job resource utilization statistics. Like all mars_api
functions, mars_api::job_stat_stream
may throw a mars_api::mars_exception
. Additionally, mars_api::job_stat
is defined as follows:
struct job_stat
{
uint64_t rs_size; // resident set size (bytes)
uint64_t vm_size; // virtual memory size (bytes)
uint64_t job_seconds; // total run time of the job (seconds)
uint64_t cpu_seconds; // total CPU usage time of the job (seconds)
double cpu_freq; // CPU frequency of the job (kHz)
uint64_t disk_read; // total bytes read from disk by the job
uint64_t disk_write; // total bytes written to disk by the job
int node_count; // number of compute nodes in use by the job
int num_cpus; // number of CPU cores allocated to the job
};
In that case, the Plugin the developer might change
namespace rstudio {
namespace launcher_plugins {
namespace quickstart {
class QuickStartResourceStream : public api::AbstractResourceStream
{
public:
/**
* @brief Constructor.
*
* @param in_job The job for which resource utilization metrics should be streamed.
* @param in_launcherCommunicator The communicator through which messages may be sent to the launcher.
*/
QuickStartResourceStream(
const api::ConstJobPtr& in_job,
comms::AbstractLauncherCommunicatorPtr in_launcherCommunicator);
/**
* @brief Initializes the resource utilization stream.
*
* @return Success if resource utilization streaming was started correctly; Error otherwise.
*/
Error initialize() override;
};
} // namespace quickstart
} // namespace launcher_plugins
} // namespace rstudio
in QuickStartResourceStream.hpp to
namespace orchid {
namespace mars {
#include <memory>
class MarsResourceStream :
public api::AbstractResourceStream,
public std::enable_shared_from_this<MarsResourceStream>
{
public:
/**
* @brief Constructor.
*
* @param in_job The job for which resource utilization metrics should be streamed.
* @param in_launcherCommunicator The communicator through which messages may be sent to the launcher.
*/
MarsResourceStream(
const api::ConstJobPtr& in_job,
comms::AbstractLauncherCommunicatorPtr in_launcherCommunicator);
/**
* @brief Initializes the resource utilization stream.
*
* @return Success if resource utilization streaming was started correctly; Error otherwise.
*/
Error initialize() override;
private:
typedef std::shared_ptr<MarsResourceStream> SharedThis;
typedef std::weak_ptr<MarsResourceStream> WeakThis;
// An opaque handle to the Job statistics stream.
std::shared_ptr<mars_api::stream_handle> m_streamHandle;
// The last measured number of CPU seconds.
int m_lastCpuSeconds;
// The last measured number of Job seconds.
int m_lastJobSeconds;
// Calculates CPU percent from the CPU and Job time.
double calcCpuPercent(int in_cpuSeconds, int in_jobSeconds);
// Callback function when Job statistic data is available.
static void onData(
WeakThis in_weakThis,
bool in_success,
const mars_api::job_stat* in_stats);
};
} // namespace mars
} // namespace orchid
in MarsResourceStream.hpp, and
#include <QuickStartResourceStream.hpp>
namespace rstudio {
namespace launcher_plugins {
namespace quickstart {
QuickStartResourceStream::QuickStartResourceStream(
const api::ConstJobPtr& in_job,
comms::AbstractLauncherCommunicatorPtr in_launcherCommunicator) :
api::AbstractResourceStream(in_job, in_launcherCommunicator)
{
}
Error QuickStartResourceStream::initialize()
{
// TODO #16: Create a resource utilization stream.
return Success();
}
} // namespace quickstart
} // namespace launcher_plugins
} // namespace rstudio
in QuickStartResourceStream.cpp to
#include <MarsResourceStream.hpp>
#include <Error.hpp>
namespace orchid {
namespace mars {
MarsResourceStream::MarsResourceStream(
const api::ConstJobPtr& in_job,
comms::AbstractLauncherCommunicatorPtr in_launcherCommunicator) :
api::AbstractResourceStream(in_job, in_launcherCommunicator),
m_lastCpuSeconds(0),
m_lastJobSeconds(0)
{
}
Error MarsResourceStream::initialize()
{
try
{
m_streamHandle = mars_api::job_stat_stream(
m_job->Id.c_str(),
3, // Get updates reasonably often, but don't use too many resources
std::bind(&MarsResourceStream::onData, weak_from_this(), std::placeholders::_1, std::placeholders::_2));
}
catch (const mars_api::mars_exception& e)
{
return Error("MarsApiError", e.code(), e.what(), ERROR_LOCATION);
}
return Success();
}
double MarsResourceStream::calcCpuPercent(int in_cpuSeconds, int in_jobSeconds)
{
// Calculate CPU percent by dividing the change in the CPU seconds by the change in the Job's run time (also seconds)
double percent = -1.0;
// Avoid diving by zero.
if (in_jobSeconds > m_lastJobSeconds)
{
double cpuChange = in_cpuSeconds - m_lastCpuSeconds;
double jobChange = in_jobSecodns - m_lastJobSeconds;
percent = (cpuChange / jobChange) * 100.0;
m_lastCpuSeconds = in_cpuSeconds;
m_lastJobSeconds = in_jobSeconds;
}
return percent;
}
static void MarsResourceStream::onData(
WeakThis in_weakThis,
int in_errorCode,
const mars_api::job_stat* in_stats)
{
if (SharedThis sharedThis = in_weakThis.lock())
{
// Handle the error case first.
if (in_errorCode != 0)
{
logging::logWarningMessage(
"Failed to retrieve Job resource utilization data from Mars with error code " + std::to_string(in_errorCode),
ERROR_LOCATION);
}
// Otherwise populate the resource utilization data and post it.
rstudio::launcher_plugins::api::ResourceUtilData data;
double percent = sharedThis->calcCpuPercent(in_stats->cpu_seconds, in_stats->job_seconds);
if (percent >= 0.0)
data.CpuPercent = percent;
data.CpuSeconds = in_stats->cpuSeconds;
// Convert memory from B to MB.
data.VirtualMem = in_stats->vm_size / (1024 * 1024);
data.ResidentMem = in_stats->rs_size / (1024 * 1024);
sharedThis->reportData(data);
}
}
} // namespace mars
} // namespace orchid