6 Advanced Features
This section describes optional, advanced features of the SDK which are not covered in the RStudio Launcher Plugin SDK QuickStart Guide.
6.1 Error Handling
The RStudio Launcher Plugin SDK QuickStart Guide creates a new Error with an arbitrary name and code at each location that an error would occur. It is advisable to take a more systematic approach to error handling. The specific implementation is completely at the discretion of the Plugin developer, however this section will discuss one possible organizational strategy.
For each category of error that may occur, create an enum which represents the specific types of errors in that category, and a function (or a few functions) which create Error
objects with the correct category name.
Error codes must not begin at 0 as that would be considered a ‘Success’ error code (i.e. not an error). Error codes may be reused across categories.
6.1.1 Example
Suppose that the Orchid Organization’s developer determines that there are only two category of error that cannot be covered by a system or unknown error (available in Error.hpp
): Mars API errors, and internal errors.
The developer might create the file MarsError.hpp
as follows:
#ifndef ORCHID_MARS_ERROR_HPP_
#define ORCHID_MARS_ERROR_HPP_
namespace rstudio {
namespace launcher_plugins {
class Error;
} // namespace launcher_plugins
} // namespace rstudio
namespace orchid {
namespace mars {
enum class InternalError
{
SUCCESS = 0,
UNKNOWN = 1,
CONVERSION_FAILURE = 2,
JOB_NOT_FOUND = 3
};
/** Enum which represents a Mars API Error. */
enum class MarsError
{
SUCCESS = 0,
UNKNOWN = 1,
CONN_TIMEOUT = 2,
NOT_AUTHORIZED = 3,
UNSUPPORTED_VERSION = 4
};
Error createMarsError(
const mars_api::mars_exception& in_exception,
const ErrorLocation& in_location);
Error createMarsError(
const mars_api::mars_exception& in_exception,
const Error& in_cause,
const ErrorLocation& in_location);
Error createVersionError(
const std::string& in_supportedVersion,
const std::string& in_actualVersion,
const ErrorLocation& in_location);
Error createInternalError(
InternalError in_code,
const std::string& in_message,
const ErrorLocation& in_location);
Error createInternalError(
InternalError in_code,
const std::string& in_message,
const Error& in_cause,
const ErrorLocation& in_location);
} // namespace mars
} // namespace orchid
#endif
Then MarsError.cpp
might look like this:
#include "MarsError.hpp"
#include <Error.hpp>
namespace orchid {
namespace mars {
namespace {
constexpr const char* s_internalErrorName = "InternalPluginError";
constexpr const char* s_marsErrorName = "MarsApiError";
} // anonymous namespace
Error createMarsError(
const mars_api::mars_exception& in_exception,
const ErrorLocation& in_location)
{
return createMarsError(in_exeption, Success(), in_location);
}
Error createMarsError(
const mars_api::mars_exception& in_exception,
const Error& in_cause,
const ErrorLocation& in_location)
{
MarsError code = MarsError::UNKNOWN;
if (in_exception.code() == 14) // Connection timeout
code = MarsError::CONN_TIMEOUT;
else if (in_exception.code() == 52) // Not authorized
code = MarsError::NOT_AUTHORIZED;
if (in_cause == Success())
return Error(
s_marsErrorName,
static_cast<int>(code),
in_exception.what(),
in_location);
return Error(
s_marsErrorName,
static_cast<int>(code),
in_exception.what(),
in_cause,
in_location);
}
Error createVersionError(
const std::string& in_supportedVersion,
const std::string& in_actualVersion,
const ErrorLocation& in_location)
{
return Error(
s_marsErrorName,
static_cast<int>(MarsError::UNSUPPORTED_VERSION),
"Mars API is version \"" +
in_actualVersion +
"\" which is not supported. Supported version(s): "
+ in_supportedVersion,
in_location);
}
Error createInternalError(
InternalError in_code,
const std::string& in_message,
const ErrorLocation& in_location)
{
return Error(
s_internalErrorName,
static_cast<int>(in_code),
in_message,
in_location);
}
Error createInternalError(
InternalError in_code,
const std::string& in_message,
const Error& in_cause,
const ErrorLocation& in_location)
{
return Error(
s_internalErrorName,
static_cast<int>(in_code),
in_message,
in_cause,
in_location);
}
} // namespace mars
} // namespace orchid
As an example of how these functions might be used, consider the example in TODO #10 of the RStudio Launcher Plugin SDK QuickStart Guide. With this new error handling, the Plugin developer may change the implementation to the following:
Error MarsJobStatusWatcher::getJobDetails(const std::string& in_jobId, api::JobPtr& out_job) const
{
const options::MarsOptions& opts = options::MarsOptions::getInstance();
mars_api::job marsJob;
try
{
unsigned long id = std::strtoul(in_jobId);
marsJob = mars_api::list_job(opts.getMarsServiceUser(), id);
}
catch (const std::invalid_argument& e)
{
return createInternalError(
InternalError::CONVERSION_FAILURE,
"Job ID " +
in_jobID +
" could not be converted to unsigned long: " +
e.what(),
ERROR_LOCATION);
}
catch (const std::out_of_range& e)
{
return createInternalError(
InternalError::CONVERSION_FAILURE,
"Job ID " +
in_jobID +
" is out of range to be converted to unsigned long: " +
e.what(),
ERROR_LOCATION);
}
catch (const mars_api::mars_exception& e)
{
return createMarsError(e, ERROR_LOCATION);
}
// This should only be invoked for Launcher Jobs because of the filtering in pollJobStatus,
// so return an error if somehow it's not a Launcher Job.
if (job._name.find("[RStudio Launcher]") == std::string::npos)
return createInternalError(
InternalError::JOB_NOT_FOUND,
"Job " + in_jobId + " is not an RStudio Launcher Job.",
ERROR_LOCATION);
out_job = marsJobToJob(marsJob);
return Success();
}
With this approach, the Plugin developer has created much more consistent and informative errors.
6.2 Date-Time Support
The SDK provides a utility class for working with DateTime
objects (include/system/DateTime.hpp
). Dates and times can be converted from string to system::DateTime
using system::DateTime::fromString
, and to string using system::DateTime::toString
. When converting from string, the default expected time string format will be YYYY-MM-DDThh:mm:ss.ssssssTZ
, or %Y-%m-%dT%H:%M:%S%F%ZP
using the supported format specification. This is the ISO 8601 extended time format. When converting to string, the default output format is YYYY-MM-DDThh:mm:ss.ssssssZ
, or %Y-%m-%dT%H:%M:%S%FZ
. Note that the default output string is in UTC time.
To use a non-default format specification, a custom format specification may be provided to the conversion function. Below is a table which describes the possible format values. As with most string formats, characters which are not prefixed by %
will be included in the output (or should be included in the input) verbatim.
Value | Description | Example |
---|---|---|
%a | Short weekday | “Mon”, “Tue” |
%A | Long weekday | “Monday”, “Tuesday” |
%b | Short month | “Nov”, “Dec” |
%B | Long month | “November”, "December |
%d | Numerical day of the month | “01” through “31”, as appropriate for the month |
%f | Fractional seconds | “04:01:33.000000”, “04:52:16.598763” |
%F | Fractional seconds, if non-zero | “04:01:33”, “04:52:16.598763” |
%H | Hour value, on the 24 hour clock | “00” through “23” |
%j | Numerical day of the year | “001” through “365” (or “366” on leap years) |
%m | Numerical month | “01” through “12” |
%M | Minute value | “00” through “59” |
%s | Seconds with fractional seconds | “28.003251” |
%S | Seconds | “28” |
%U | Numerical week of the year, starting on a Sunday | “00” through “53”, where the first Sunday in January is the first day of week 01 |
%w | Numerical day of the week, starting from 0 | “0” through “6” |
%W | Numerical week of the year, starting on a Monday | “00” through “53”, where the first Monday in January is the first day of week 01 |
%y | Two digit year | 2019 would be “19” |
%Y | Four digit year | “2019” |
%ZP | Posix time zone string | “-07:00”, “PST-08PDT+01,M4.1.0/02:00,M10.5.0/02:00”, “Z” |
More advanced formatting flags and additional documentation regarding the parsing and formatting of DateTime
objects can be found in Boost’s Date Time I/O documentation.
6.3 User Profiles
It may be useful to allow system administrators to set default or maximum values for certain features on a per-user or per-group basis. For example, if a job scheduling system supports requesting an amount of memory for a job, system administrators may wish to give different memory levels to different groups of users. For more examples, see the sample /etc/rstudio/launcher.kubernetes.profiles.conf
in the Job Launcher Plugin Configuration section of the RStudio Job Launcher Guide.
For the convenience of the Plugin Developer, the AbstractUserProfiles
class may be overridden to quickly implement support for user profiles.
AbstractUserProfiles
contains protected templated functions for getting a value by name. The templates are defined in the CPP file, and are declared for the following types:
std::string
uint32_t
int32_t
uint64_t
int64_t
double
bool
std::set<U>
, whereU
is one of the above types.std::vector<U>
, whereU
is one of the above types, exceptstd::set
.std::map<U, V>
, whereU
andV
are any pair of the above types.
If a custom type is needed, retrieve the value as a string and then parse it as needed. For an example, see the TestUserProfiles
class in sdk/src/options/tests/UserProfilesTests.cpp
.
The minimum requirements to implement AbstractUserProfiles
are:
- A public constructor which sets the plugin name via the
AbstractUserProfiles(const std::string& in_pluginName)
constructor. Alternately, a private constructor with a public staticgetInstance
method may be used to implement the singleton pattern. This will prevent the need to read the configuration file multiple times. - An implementation of
AbstractUserProfiles::getValidFieldNames
which returns a set of all supported values that may be set via the user profiles configuration file. - An implementation of
AbstractUserProfiles::validateValues
which calls one of the two protectedAbstractUserProfiles::validateValue
methods for each valid field, with the appropriate template parameter.
If the above criteria are met, the expected location of the user profiles configuration file will be /etc/rstudio/launcher.<plugin name>.profiles.conf
.
The AbstractUserProfiles::validateValues
method is called by AbstractUserProfiles::initialize
after the user profiles configuration file has been read and parsed to ensure that any configuration mistakes within the file will be caught early. The AbstractUserProfiles::initialize
method should be called from the IJobSource::initialize
method to ensure that the user profiles configuration file has been read into memory and parsed before the Plugin enters normal operation mode. If the user profiles initialize
method returns an error, the IJobSource::initialize
method should also return an error.
6.3.1 Example
This example continues the examples started in the RStudio Launcher Plugin SDK QuickStart Guide. Assume that the Mars job scheduling system supports requesting a CPU count and an amount of memory, in MB. For simplicity, this examples implements the MarsUserProfiles
class completely within the hpp
file.
MarsUserProfiles.hpp
#include <options/AbstractUserProfiles.hpp>
#include <Error.hpp>
#include <system/User.hpp>
namespace orchid {
namespace mars {
namespace options {
using namespace rstudio::launcher_plugins;
class MarsUserProfiles : public options::AbstractUserProfiles
{
public:
static MarsUserProfiles& getInstance()
{
static MarsUserProfiles userProfiles;
return userProfiles;
}
uint32_t getDefaultCpus(const system::User& in_user) const
{
// Default value is 1.
uint32_t defaultCpus = 1;
Error error = getValueForUser("default-cpus", in_user, defaultCpus);
// It shouldn't be possible to get any Error except a not-found error here because of
// validateValues. If it somehow occurred in release, just return the default value.
assert(!error || isValueNotFoundError(error));
return defaultCpus;
}
uint32_t getMaxCpus(const system::User& in_user) const
{
// Default value is 1.
uint32_t maxCpus = 1;
Error error = getValueForUser("max-cpus", in_user, maxCpus);
// It shouldn't be possible to get any Error except a not-found error here because of
// validateValues. If it somehow occurred in release, just return the default value.
assert(!error || isValueNotFoundError(error));
return maxCpus;
}
uint32_t getMaxMemory(const system::User& in_user) const
{
// Default value is 10 MB.
uint32_t maxMemory = 10;
Error error = getValueForUser("max-memory-mb", in_user, maxMemory);
// It shouldn't be possible to get any Error except a not-found error here because of
// validateValues. If it somehow occurred in release, just return the default value.
assert(!error || isValueNotFoundError(error));
return maxMemory;
}
uint32_t getDefaultCpus(const system::User& in_user) const
{
// Default value is 5 MB.
uint32_t defaultMemory = 5;
Error error = getValueForUser("default-memory-mb", in_user, defaultMemory);
// It shouldn't be possible to get any Error except a not-found error here because of
// validateValues. If it somehow occurred in release, just return the default value.
assert(!error || isValueNotFoundError(error));
return defaultMemory;
}
private:
MarsUserProfiles() :
AbstractUserProfiles("mars")
{
m_validFieldNames.insert("max-cpus");
m_validFieldNames.insert("default-cpus");
m_validFieldNames.insert("max-mem-mb");
m_validFieldNames.insert("default-mem-mb");
}
const std::set<std::string>& getValidFieldNames() const override
{
return m_validFieldNames;
}
Error validateValues() const override
{
// For supported types, validateValue will attempt to parse every occurrence of the field as
// specified type. If a custom type is desired, use
// AbstractValidateValue::validateValue(
// const std::string& in_value,
// const CustomValueValidator& in_validator) const;
// method to supply a custom validator instead. in_validator should parse the value it is
// supplied and return an error if parsing fails.
Error error = validateValue<uint32_t>("default-cpus");
if (error)
return error;
error = validateValue<uint32_t>("max-cpus");
if (error)
return error;
error = validateValue<uint32_t>("default-memory-mb");
if (error)
return error;
return validateValue<uint32_t>("max-cpus");
}
std::set<std::string> m_validFieldNames;
}
} // namespace options
} // namespace mars
} // namespace orchid
MarsJobSource.cpp
// Other includes...
#include <options/MarsOptions.hpp>
#include <options/MarsUserProfiles.hpp>
// Other methods...
Error MarsJobSource::initialize()
{
const options::MarsOptions& opts = options::MarsOptions::getInstance();
try
{
mars_api::init(opts.host(), opts.port(), opts.useSsl());
}
catch (const mars_expcetion& e)
{
return Error("MarsApiError", e.code(), e.what(), ERROR_LOCATION);
}
const options::MarsUserProfiles& userProfiles = options::MarsUserProfiles::getInstance();
return userProfiles.initialize();
}
// Other methods..
6.4 Custom Job Source Configuration
Important: This feature is not exposed through the RStudio Workbench job launching UI. The use of this feature will require a feature request to the RStudio IDE project. This feature should only be used when there are no other alternatives.
The Cluster Info Response is used to report the configuration and capabilities of the Plugin. The RStudio Launcher Plugin SDK QuickStart Guide describes how the Plugin Developer may declare support for various types of resource limits, containers, custom job placement constraints, and job queues. In the event that there is some job configuration setting that is not covered by one of those built-in job settings, the Job Config feature may used.
A JobConfig
value consists of the name of the configuration setting, its type, and optionally its value. It may have one of four types: string
, int
, enum
, or float
.
To declare support for a custom job configuration value, create a JobConfig
object that represents the name and type of that value and add it to the JobSourceConfiguration::CustomConfig
vector in the overridden implementation of IJobSource::getConfiguration
.
When a job is submitted, any custom configuration values that were set on the job can be found on Job::Config
.
6.5 Job Status Updates
The Plugin needs to keep an accurate record of all of the Jobs that were submitted to the Job Scheduling System by the Launcher. This can be implemented in any way that suits the Job Scheduling System as long as JobStatusNotifier::updateJobStatus
is invoked each time the status of a job changes. The JobStatusNotifier::updateJobStatus
validates that the current status really is an update, so there is no need for the Plugin implementation to do that check.
The two most common ways to implemented this feature are streaming the Job statuses and polling the Job statuses. Both methods can be implemented with the help of the AbstractJobStatusWatcher
base class; however, streaming is the preferred method, as it should be more efficient than polling.
The AbstractTimedJobStatusWatcher
class, which extends the AbstractJobStatusWatcher
class, implements common functionality for polling job statuses. For more details about implementing Job status updates via polling, see TODO #’s 9 - 11 in the RStudio Launcher Plugin SDK QuickStart Guide.
6.5.1 Streaming
Streaming is the preferred method for implementing job status updates, as it can be more efficient than polling job statuses. This is because polling requires making a job status request of the Job Scheduling System every interval of time, and may result in reading the same status multiple times before a change is observed. If the Job Scheduling System provides an API that streams Job status changes, the Plugin should only have to process each status change once.
6.5.1.1 Example
Suppose that the Mars API provides a stream_statuses
function which takes a callback function as a parameter with the signature std::function<void(const mars_api::job_status&)>
. Assume that the mars_api::job_status
structure has the Job ID, the Job name, the Job Status, the last modification time, and the reason for the current Job status. Then the Plugin developer might change the implementation of MarsJobStatusWatcher
to the following:
MarsJobStatusWatcher.hpp
:
#ifndef ORCHID_MARS_MARS_JOB_STATUS_WATCHER_HPP
#define ORCHID_MARS_MARS_JOB_STATUS_WATCHER_HPP
#include <jobs/AbstractJobStatusWatcher.hpp>
#include <memory>
namespace orchid {
namespace mars {
class MarsJobStatusWatcher :
public jobs::AbstractJobStatusWatcher,
public std::enable_shared_from_this<MarsJobStatusWatcher>
{
public:
/**
* @brief Constructor.
*
* @param in_jobRepository The job repository, from which to look-up jobs.
* @param in_jobStatusNotifier The job status notifier to which to post job updates.
*/
MarsJobStatusWatcher(
jobs::JobRepositoryPtr in_jobRepository,
jobs::JobStatusNotifierPtr in_jobStatusNotifier);
/**
* @brief Starts the Job status watcher.
*
* @return Success if the Job status watcher could be started; Error otherwise.
*/
Error start();
/**
* @brief Stops the Job status watcher.
*/
void stop();
private:
/**
* @brief Handles a change in job status when it is reported by the Mars Job Scheduling System.
*
* @param in_jobStatus The new job status.
*/
void onJobStatusUpdate(const mars_api::job_status& in_jobStatus);
/**
* @brief Gets the job details for the specified job.
*
* @param in_jobId The ID of the job to retrieve.
* @param out_job The populated Job object.
*
* @return Success if the job details could be retrieved and parsed; Error otherwise.
*/
Error getJobDetails(const std::string& in_jobId, api::JobPtr& out_job) const override;
// The Job status stream.
std::unique_ptr<mars_api::status_stream> m_jobStream;
};
/** Convenience typedef. */
typedef std::shared_ptr<MarsJobStatusWatcher> MarsJobStatusWatcherPtr;
} // namespace mars
} // namespace orchid
#endif
MarsJobStatusWatcher.cpp
:
#include "MarsJobStatusWatcher.hpp"
namespace orchid {
namespace mars {
typedef std::shared_ptr<MarsJobStatusWatcher> SharedThis;
typedef std::weak_ptr<MarsJobStatusWatcher> WeakThis;
MarsJobStatusWatcher::MarsJobStatusWatcher(
jobs::JobRepositoryPtr in_jobRepository,
jobs::JobStatusNotifierPtr in_jobStatusNotifier) :
jobs::AbstractJobStatusWatcher(
std::move(in_jobRepository),
std::move(in_jobStatusNotifier))
{
}
Error MarsJobStatusWatcher::start()
{
WeakThis weakThis = shared_from_this();
const options::MarsOptions& opts = options::MarsOptions::getInstance();
try
{
m_jobStream = std::move(mars_api::stream_statuses(
[weakThis](const mars_api::job_status& in_jobStatus)
{
if (SharedThis sharedThis = weakThis.lock)
sharedThis->onJobStatusUpdate(in_jobStatus);
}));
}
catch (const mars_api::mars_exception& e)
{
return createMarsError(e, ERROR_LOCATION);
}
}
void MarsJobStatusWatcher::stop()
{
m_JobStream.reset();
}
void MarsJobStatusWatcher::onJobStatusUpdate(const mars_api::job_status& in_jobStatus)
{
if (in_jobStatus._name.find("[RStudio Launcher]") != std::string::npos)
{
system::DateTime lastModified;
Error error = system::DateTime::fromString(job._last_modified, lastModified);
if (error)
{
// Use the current time as the invocation time instead, but log an error.
logging::logError(error, ERROR_LOCATION);
error = updateJobStatus(
std::to_string(job._id),
marsStatusToStatus(job._status),
job._reason);
}
else
{
error = updateJobStatus(
std::to_string(job._id),
marsStatusToStatus(job._status),
job._reason,
lastModified);
}
if (error)
logging::logError(error, ERROR_LOCATION);
}
}
Error MarsJobStatusWatcher::getJobDetails(const std::string& in_jobId, api::JobPtr& out_job) const
{
const options::MarsOptions& opts = options::MarsOptions::getInstance();
mars_api::job marsJob;
try
{
unsigned long id = std::strtoul(in_jobId);
marsJob = mars_api::list_job(opts.getMarsServiceUser(), id);
}
catch (const std::invalid_argument& e)
{
return createInternalError(
InternalError::CONVERSION_FAILURE,
"Job ID " +
in_jobID +
" could not be converted to unsigned long: " +
e.what(),
ERROR_LOCATION);
}
catch (const std::out_of_range& e)
{
return createInternalError(
InternalError::CONVERSION_FAILURE,
"Job ID " +
in_jobID +
" is out of range to be converted to unsigned long: " +
e.what(),
ERROR_LOCATION);
}
catch (const mars_api::mars_exception& e)
{
return createMarsError(e, ERROR_LOCATION);
}
// This should only be invoked for Launcher Jobs because of the filtering in pollJobStatus,
// so return an error if somehow it's not a Launcher Job.
if (job._name.find("[RStudio Launcher]") == std::string::npos)
return createInternalError(
InternalError::JOB_NOT_FOUND,
"Job " + in_jobId + " is not an RStudio Launcher Job.",
ERROR_LOCATION);
out_job = marsJobToJob(marsJob);
return Success();
}
} // namespace mars
} // namespace orchid
6.5.2 Other Methods
It is possible that neither streaming nor polling are the best solution for keeping job statuses up to date. The use of an AbstractJobStatusWatcher
is completely optional, and the Plugin developer may choose to implement this feature in any way that best suits the Job Scheduling System. For example, the provided sample Local Plugin does not use an AbstractJobStatusWatcher
. Jobs are launched on the local system by forking a new processes and running the requested command in that process. The Local Plugin receives notifications when the child process writes to standard out, standard error, or exits. When the process exits, the Job state is transitioned from Job::State::RUNNING
to Job::State::FINISHED
. The Local Plugin also keeps track of when the process should transition from Job::State::PENDING
to Job::State::RUNNING
by checking whether the executable name has changed from rsandbox
(a utility for launching processes in an isolated environment provided with the RStudio Workbench installation) to the name of the actual executable for the Job.
6.6 Customizing the Job Repository
In ‘TODO #8’ of the ‘RStudio Launcher Plugin SDK QuickStart Guide’, the Plugin developer implemented the AbstractJobRepository::loadJobs
method to populate the Job Repository on bootstrap. In the case that the Plugin needs to do special processing when a Job is added or removed from the repository, it can do so by overriding the other virtual methods on AbstractJobRepository
.
An example of when this may be necessary is if the Plugin needs to do additional Job state persistence, beyond what the Job Scheduling System will save. A common case of this is Job output. If the user does not specify an output file the Job Scheduling System may not persist the Job output; however, it must be available to the Launcher until the Job expires according to the Launcher’s configured job-expiry-hours
.
There are three additional virtual methods on AbstractJobRepository
that allow the Plugin developer to customize the behavior of the Job Repository:
AbstractJobRepository::onJobAdded
: this method will be invoked when a job is first added to the repository, immediately after successful submission.AbstractJobRepository::onJobRemoved
: this method will be invoked when an expired Job is removed from the system. Any files or other persistent data that were created by the Plugin should be cleaned up in this method.AbstractJobRepository::onInitialize
: this method will be invoked once, when the Job Repository is initialized during bootstrap. The Plugin may do any extra initialization steps that are required and is responsible for returning anError
if any necessary initialization steps fail.
The provided sample Local Launcher Plugin manages Job persistence completely within the Plugin. The LocalJobRepository
implementation may be used as an example for the implementation of all three virtual methods on AbstractJobRepository
.
6.7 Process Launching
Depending on the API exposed by the Job Scheduling System, it may be necessary to launch child processes to perform actions on the Job Scheduling System, such as running a job or listing the jobs in the system. The SDK provides a number of classes and functions for launching child processes in the system/Process.hpp
header file.
Child processes launched through the SDK provided process module are run through the rsandbox
process by default. This is done to ensure that the child process will be run in an isolated environment, however it prevents the parent process from continuing to write standard input to the child process. If this is needed by the Plugin, it is possible to launch the child process directly and keep the standard input stream open by setting system::process::ProcessOptions::UseSandbox = false
and system::process::ProcessOptions::CloseStdIn = false
respectively.
The SDK process launching module will escape the command, arguments, standard input, the standard output and standard error files, and environment variables as appropriate. The command, arguments and environment values will be treated literally. Bash expansion of them will not take place. Bash expansion may take place within the standard input, however.
6.8 Custom Output Streams
To create a custom output stream, the Plugin developer must create a class which inherits api::AbstractOuptutStream
and implements the api::AbstractOutputStream::start
and api::AbstractOutputStream::stop
methods.
In the api::AbstractOutputStream::start
method, the output stream implementation should begin reporting the Job’s output. To report output, the implementation must invoke the protected method api::AbstractOutputStream::reportData
specifying the data and the type of output. The output type will be OutputType::STDOUT
for standard output, OutputType::STDERR
for standard error, or if it is not possible to tell OutputType::BOTH
. It may not be possible to tell the output type if the Job specified the same output location for both standard output and standard error output.
When the stream has completed, the output stream implementation should invoke the protected method api::AbstractOututStream::setStreamComplete
. The stream is complete when the Job has finished emitting all output. This can only happen if the Job is in a completed state, which can be tested with api::Job::isCompleted
. Even if a Job has completed, some Job Scheduling Systems buffer job output, so it may take a few seconds after the Job has completed for the remainder of the job output to be emitted.
If api::AbstractOutputStream::stop
has been invoked, the output stream implementation should stop streaming data, even if the stream has not been completed.
For an example of a correct and complete implementation of an api::AbstractOutputStream
child class, please refer to api::FileOutputStream
.
6.8.1 Customizing the File Output Stream
It is possible that the Plugin will be able to read Job output from a file, but it will need to process the Job output in some way before surfacing the output to the user. For example, the RStudio Slurm Launcher Plugin emits one line of output at the start of each Job that represents extra Job metadata that it needs, and one line at the end of each Job to indicate that all output has been emitted. In that case, the Plugin may customize the behavior of the api::FileOutputStream
class by inheriting from it and overriding api::FileOutputStream::onOutput
and/or api::FileOutputStream::waitForStreamEnd
. By default api::FileOutputStream::onOutput
emits every line of output and api::FileOutputStream::waitForStreamEnd
waits for a fixed short period of time after the Job enters a completed state before ending the stream.
The RStudio Slurm Launcher Plugin would override api::FileOutputStream::onOutput
to skip the first and last lines of output, and to notify a condition variable when the last line of output is emitted. It would override api::FileOutputStream::waitForStreamEnd
to wait on the aforementioned condition variable instead of waiting for a fixed period of time.