Medusa Cluster Server – Technical Documentation

1            Overview

The Medusa cluster server is a high performance clustering engine providing services to distribute Java processes amongst multiple processing resources. Its main characteristics are

 

-         Low system resource requirements

-         Runs on standard JRE 1.3 (100% pure Java)

-         Dynamic auto-configuring cluster and fail-over

-         Highly scalable

-         Provides options for performance tuning

-         State synchronization of theoretically unlimited data size

-         Runs on multiple/mixed server hardware and supports multiple instances per IP address

-         Configurable logging options

2           Technical Introduction

The Medusa clustering engine takes care of most of the work involved in clustering such as distributing tasks amongst the cluster and managing cluster members starting up, shutting down and failing-over. It also provides a simple and powerful API for users to implement their own processing logic.

 

The Medusa cluster server consists of three components:

-         A local resource manager for executing user defined business logic (jobs)

-         The cluster server managing the logical and physical cluster

-         A client server for managing external clients requesting logical cluster resources

 

2.1          ResourceManager & Jobs

The ResourceManager is responsible for scheduling and executing jobs on the local server. The server maintains a user definable number of Timers that execute jobs once or periodically.

 

2.1.1        Programming, Loading and Initializing Jobs

Jobs are user-created classes that perform specific tasks when executed on the server. They extend the Medusa Job class (and in turn JDK TimerTask) and thus should start processing their function when a run() method is called.

Jobs should be made available to the server as compiled classes in the jobs directory. When the server receives a request to execute a task the JobClassLoader will attempt to load the class from this directory and construct a Job object. This will then be passed on to the JobScheduler which will run it as a TimerTask.

Initialization of jobs is performed dynamically via the API or through properties files in the jobs directory.  If requested the JobClassLoader will look for a <class-name>.properties file of the same name as the job class and pre-initialize the resulting object with its data when loading a job. Examples will follow later.

Job properties, either via dynamic API or loaded from a file, are initialized via getter and setter methods. The JobClassLoader will for example attempt to find for a property called TESTPROPERTY a method public void setTestproperty(String value) to initialize the job object. (Note that capitalization occurs only on the first letter of a composed property name!) If the required data type is different the method should still have a String signature but then convert the String value to the desired type. If initialization data is too complex to be converted into String properties it cannot be loaded from a properties file but the dynamic API provides a mechanism to pass in any serializable data type. Before explaining this mechanism we need to introduce Joblets and different job types.

 

2.1.1.1       Joblets

For performance reasons and to limit bandwidth usage between servers it is of advantage to be able to send jobs requests to cluster servers without having to serialize the complete job object including business logic and data - instead we use Joblets. Joblets principally contain information to identify a job class and data. They can be used to request cluster members to start jobs, stop jobs or to return results from jobs.

Apart from job class name and a unique job identifier Joblets contain JobConfig objects, which are basically wrappers for job data. The method public void setProperties(Properties config) in JobConfig describes the dynamic API discussed in the paragraph above. Additionally the method public void setAdditionalProperties(Serializable config) provides a mechanism to transmit any Serializable data object. However, the JobClassLoader will not automatically initialize job classes with this data. It needs to be handled programmatically by the user.

The action to perform with a Joblet is defined in its command, which can be accessed with the methods public void setCommand(int command) and public int getCommand().

 

Commands typically used to initiate actions with Joblets include

-          JOBLET_START   : Start a job

-          JOBLET_REASSIGN: Reassign a job to a new server

-          JOBLET_STOP    : Terminate a job and allow it to exit gracefully

-          JOBLET_KILL    : Immediately kill a job

-          JOBLET_UPDATE  : Update a job with new data

 

Commands typically returning status information about jobs include

-          JOBLET_RUNNING : A server accepted a job request

-          JOBLET_ERROR   : An error occurred while processing the job

-          JOBLET_ABORTED : A job terminated abnormally

-          JOBLET_FINISHED: A job completed its task and is done processing

 

Users should be able to program most applications with the simple JOBLET_START and JOBLET_STOP commands.

 

2.1.1.2       Cluster Jobs

Jobs fall into two different categories: master jobs and cluster jobs. Technically both are the same as they are implemented with the same API but servers treat them differently. However, what is true for cluster jobs also applies to master jobs minus the exceptions listed in the master job section.

 

A cluster job is basically a worker bean implementing the Runnable interface. When the ResourceManager receives a job request it asks the JobClassLoader to load and initialize the object and schedules it as a TimerTask on a Timer. The number of Timer-threads is user configurable with the SYSTEM_CONCURRENT_THREADS property in system.properties. If the number is too small for the amount of jobs to be processed jobs will be queued up and delayed. If the number is too large and many jobs are being processed in parallel execution of individual jobs may be slow. The user should adjust this value to provide optimal performance for the kind of tasks he is processing by experimenting with the individual implementation.

Each job on a cluster is identified by a unique ID for call-backs and fail-over, which is automatically managed by the system. (FYI: The unique ID is made up of master server IP requesting the job, timestamp and an incremental number).

 

Example: A simple job:

 

public class DemoJob extends Job {

 

  /** Storage for counter */

  private int i = 0;

 

  /** Default constructor */

  public DemoJob() {}

 

  /** This method is executed when job is being processed */

  public void run() {

    i++;

    System.out.println("DemoJob: " + " - " + i);

  }

}

 

2.1.1.3       Master Jobs

Master jobs are jobs that are executed only on the active cluster master server. Should the master server crash then another server will assume the master role and start processing master jobs.

Master jobs provide the link between the entity that initiated a resource request and the cluster as a logical unit. This entity can be anything from an external client to a monitoring agent embedded in the master job.

Master jobs are defined in the system.properties file as a space separated list of class names as value of the JOB_MASTER property, which will be loaded at system start-up.

Since master jobs are executed as start-up jobs on the master server they are initialized with properties files as discussed above rather than through the dynamic API. The JobClassLoader will look for a properties file in the jobs directory of the same name as the master job class but with a .properties extension.

There is no theoretical limit on the number of master jobs that can be running on a server but it should be kept to a minimum. One master job  should typically be sufficient. Master jobs do not use up threads from the cluster job thread pool but are running in their own threads. This will of course still use up system resources.

 

Example: A master job and its configuration files - DemoMasterJob:

 

DemoMasterJob will be loaded at startup and initialized with the DemoMasterJob.properties file. It will then be scheduled to run periodically every 2.5 seconds (PERIOD = 2500).

When running DemoMasterJob will perform two tasks in the run() method every cycle.

a)      Parse all cluster jobs it spawned so far and check for timed-out candidates. This demonstrates how a client (in this case the master job itself) can keep a reference to a job on the logical cluster without knowledge of the physical server executing the task

b)      Spawn a new simple DemoJob (as shown in the example above) and initialize it via the Joblet API

 

 

DemoMasterJob.properties - properties file for initializing the job:

 

DELAY  = 0

PERIOD = 2500

JOBTIMEOUT = 50000

 

The properties DELAY and PERIOD are parameters processed by the job parent class Job and define the delay (in milliseconds) until starting the job and the period (in milliseconds) when to repeat the job.

 

DemoMasterData.properties - user defined properties used in the DemoMasterJob class:

 

CLASS_NAME = DemoJob

DELAY = 0

PERIOD = 2000

 

The property CLASS_NAME tells the DemoMasterJob which job class to request on the cluster. DELAY and PERIOD are used for initialization of this new job class. Please note that this is simply an example data source for a master job. In a more sophisticated example the master job could obtain data from a dynamic data source such as database or by computation.

 

The DemoMasterJob class:

 

public class DemoMasterJob extends Job {

 

  /** Keeps reference to all the jobs */

  private Set jobs = new HashSet();

  /** Time-out when jobs will be cancelled */

  private long jobTimeOut;

 

  /**

   *  Default constructor

   */

  public DemoMasterJob() {}

 

  /**

   *  Entry point when job is being processed

   */

  public void run() {

 

    // Remove outdated jobs

    try {

      cleanJobs();

    }

    catch (ClusterException e) {

      System.out.println("Failed to send job cancel request: " + e);

    }

 

    // Load new job

    Joblet job = loadData();

    try {

      String refID = getClusterService().submitJob(job);

      System.out.println("Submitted job [" + refID + "] to the cluster");

      // Add it to the map of jobs

      jobs.add(job);

    }

    catch (ClusterException e) {

      System.out.println("Failed to request job: " + e);

    }

  }

 

  /**

   *  Creates a joblet based on a user created properties file (see below)

   *  Instead of loading this user properties file to data could be obtained

   *  from any other source: a database, programmatically...

   *

   * @return A new joblet if successful; null if not

   */

  private Joblet loadData() {

    // Load the user properties file

    String fileName = "jobs/DemoMasterData.properties";

    Properties data = new Properties();

    try {

      data.load(new FileInputStream(fileName));

    }

    catch (IOException e) {

      System.out.println("Unable to load DemoMasterJob.properties" + e);

      return (null);

    }

    System.out.println("DemoMasterJob.properties data loaded");

 

    // Now create and initialize the joblet

    Joblet job = new Joblet();

    JobConfig config = new JobConfig();

    job.setJobClassName(data.getProperty("CLASS_NAME"));

    job.setCommand(JobConstants.JOBLET_START);

    // Remove CLASS_NAME property, since it is not needed to init the job

    data.remove("CLASS_NAME");

    // ...and set the initialization data for the job

    config.setProperties(data);

    job.setData(config);

    return (job);

  }

 

  /**

   *  Cleaner method going through all jobs and removing timed-out ones

   */

  private void cleanJobs() throws ClusterException {

    Iterator i = jobs.iterator();

    while (i.hasNext()) {

      Joblet job = ((Joblet)(i.next()));

 

      // Exctract the job start date from its unique ID

      int i1 = job.getRefID().indexOf(":");

      int i2 = job.getRefID().lastIndexOf(":");

      String strDate = job.getRefID().substring(i1+1,i2);

      long startTime = Long.parseLong(strDate);

      long currentTime = System.currentTimeMillis();

 

      // Check if job timed out

      if ((System.currentTimeMillis()-startTime) > jobTimeOut) {

        System.out.println("Removing timed-out job [" + job.getRefID() + "]");

        i.remove();

 

        // Send a request to the logical cluster to cancel to job. The cluster

        //   will automatically identify the physical server running the job.

        job.setCommand(JobConstants.JOBLET_CANCEL);

        getClusterService().submitJob(job);

      }

    }

  }

 

  /**

   *  Example of the JobClassLoader automatically initializing properties.

   *  The JOBTIMEOUT = 50000 property in DemoMasterJob.properties results in

   *  a call to this method by the JobClassLoader with a value of "50000"

   *

   *  Access to the jobTimeOut propertiy

   *

   * @param strTimeOut The String encoded time-out in milliseconds

   */

  public void setJobtimeout(String strTimeOut) {

    jobTimeOut = Long.parseLong(strTimeOut);

  }

}

3           System Configuration

The Medusa cluster server is configured with properties files in the config directory. The properties files are loaded during startup. Please make sure to enter correct data and only values that can be parsed into numeric values should that be the expected type.

 

3.1          system.properties

The system.properties file contains settings for local server resources. The default settings and explanations are listed below

 

#############################

##  P E R F O R M A N C E  ##

#############################

 

# Defines how many concurrent threads to run for processing cluster jobs

#   Processing of jobs will be queued and delayed if more jobs are assigned

#   to the server than concurrent threads are available. If the server has

#   too many concurrent threads processing of individual jobs may become

#   slow depending on the systems performance characteristics.

#   *Master and startup jobs are not affected by this setting.

SYSTEM_CONCURRENT_THREADS = 10

 

 

#####################################

##  L O G G I N G   O P T I O N S  ##

#####################################

 

# Log level (ALL, DEBUG, INFO, ERROR, NONE)

#   The server logs information for most tasks it is performing. To limit

#   the amount of logging information for the user a level can be assigned

#   ALL  : All logging information

#   DEBUG: A verbose mode with detailed logging

#   INFO : Only essential information about important events

#   ERROR: Log only errors

#   NONE : Logging off

SYSTEM_LOG_LEVEL = INFO

 

# Classes and packages to be logged

#   Logging information can be limited to specified packages and classes.

#   Originating classes of logging calls are compared to a (space separated)

#   list and only ‘starts with’ matches will result in log entries.

#   *This is an advanced feature that should only be used for development

SYSTEM_LOG_CLASSES = server.jobs server.cluster DemoMasterJob

 

 

#########################################

##  J O B   C O N F I G U R A T I O N  ##

#########################################

 

# Master jobs to run when this server becomes cluster master

#   Upon startup the server will load and prepare the list of (space separated)

#   classes and start processing them when it becomes cluster master.

JOB_MASTER = DemoMasterJob

 

3.2         cluster.properties

The cluster.properties file contains settings for the cluster configuration of this server. The default settings and explanations are listed below

 

 

#################################

##  C L U S T E R   S E T U P  ##

#################################

 

# Network address and port of multicast cluster

#   The multicast address is a class D IP address and uses  a standard

#   UDP portnumber. Class D IP addresses are in the range 224.0.0.0 to

#   239.255.255.255, inclusive. The address 224.0.0.0 is reserved and

#   should not be used.

#   *All servers in a cluster must have the same setting.

CLUSTER_ADDRESS = 228.5.6.8

CLUSTER_PORT    = 6789

 

# Unique ID of this server instance (0-999)

#   This parameter is of relevance when running several server on a single

#   server (single IP address). The number will be combined with the IP

#   address to generate a unique ID for this server

CLUSTER_SERVER_ID = 1

 

# Ping-Pong timeout

#   When checking for other cluster members the server sends out a ping

#   This value defines (in milliseconds) how long to wait for a response

#   until declaring the ping dead. The server will block during that time

#   so the value should be minimal with consideration of network latency.

#   *Used for example when starting up the server to check for another

#    cluster master.

CLUSTER_PING_TIMEOUT = 1000     

 

#########################

##  F A I L   O V E R  ##

#########################

 

# Time between transmitting heart-beats

#   Servers send out periodic heart-beats to inform other cluster members\

#   of their presence. A too short rate (in milliseconds) results in flooding

#   the network with data. A too long period results in longer latency when

#   detecting dead members.

#   *See CLUSTER_SERVER_TIMEOUT setting below

CLUSTER_HEARTBEAT_RATE = 3000

 

# Time-out until cluster member is considered dead

#   Servers maintain a list of other cluster members updated by heart-beats.

#   The server timeout defines when a cluster member is considered dead after

#   failing to send heart-beats. A too great value will result in latency

#   when detecting and failing over from dead servers.

#   *This value must be greater than CLUSTER_HEARTBEAT_RATE

CLUSTER_SERVER_TIMEOUT  = 6000

 

#############################

##  P E R F O R M A N C E  ##

#############################

 

# Maximum size of multicast data packets (in bytes)

#   Cluster data is sent in data packets of the specified size. For smaller

#   data types a smaller value will result in best performance, for larger
#   data types a larger one. Data types that do not fit into the specified

#   size will be broken up into multiple chunks, resulting in additional

#   network overhead.

#   *Recommended minimum = 1000

CLUSTER_PACKET_MAX_SIZE = 3500

 

# Time-out for incomplete multicast packet chunks

#   When cluster data size extends CLUSTER_PACKET_MAX_SIZE it is broken into

#   multiple chunks. The value (in milliseconds) defines how long to wait for

#   all the chunks to arrive before disposing of orphaned segments and assuming

#   the transmission failed. Network latency needs to be considered for setting
#   this value. A too large value can result in server overhead for storing and
#   maintaining a large list of incomplete chunks.

CLUSTER_WRAPPER_TIMEOUT = 100

 

# Relative processing power of this server

#   A value (1-999) that is used by certain load balancing algorithms.

CLUSTER_SERVER_WEIGHT = 100

 

###################################

##  L O A D   B A L A N C I N G  ##

###################################

 

# The load balancing algorithm to select cluster resources

#  CLUSTER_LOADBALANCING_ROUNDROBIN: Sequential circular selection

CLUSTER_LOADBALANCING = CLUSTER_LOADBALANCING_ROUNDROBIN

4           Quick Start

To test the Medusa cluster server on a single machine do the following:

 

-         Unzip the provided file into an empty directory and rename it to clusterserver1

-         Make a copy of that directory and rename it to clusterserver2

-         Change CLUSTER_SERVER_ID in …/clusterserver1/config/cluster.properties to 1

-         Change CLUSTER_SERVER_ID in …/clusterserver1/config/cluster.properties to 2

-         Open terminal windows on the two clusterserver directories and type:
java –jar clusterserver.jar

 

This will start up and cluster two instances and run the demo jobs listed above in the example.

5           Contact

For support please email: support@isoplane.com.