<
High Throughput Computing using Condor

Support for Long Running Jobs via User-level Checkpointing



Note: all of the examples can be found on the Condor server under

/opt1/condor/examples/checkpoint

Introduction

One of the main drawbacks of the current Condor pool arrangement is that if jobs are running when a user logs into a PC then the job may be killed if there is significant local use of the PC. Normally this will result in any output from the Condor job being lost. It could be the case that a job has been running most of the night and yet the computation is in vain as the job is evicted the following morning. Obviously this limits the throughput of longer running jobs (and here "longer" could mean more than around twenty minutes) and is wasteful of electricity. A way of improving the situation is possible though a process known as user-level checkpointing [here "user-level" is in the sense that the user needs to do the work rather than leaving it to some other program].

A checkpoint contains the frozen state of the program such that the program may be restarted (possibly on a different machine) from the point at which it was terminated so that little (ideally zero) computational effort is lost. In the so-called "standard" Condor universe, it is possible to link application codes against a Condor checkpointing library which makes the checkpointing process completely transparent. Unfortunately this is only available on UNIX-based pools and cannot be used with the Windows-based ARC Condor pool.

All is not lost though. Even when using the so-called "vanilla" Condor universe on Windows, it is still possible to make use of checkpointing provided that users include their own checkpointing code in their applications (clearly, this limits its use to applications where the source code is available and can be modified relatively easily). A couple of examples are presented here using MATLAB code which may be used as templates for other applications. The reason for choosing MATLAB is that saving and loading data using MATLAB's MAT-format files is extremely simple. This means that checkpointing can be implemented relatively easily. For C or FORTRAN codes (for example), more coding effort is needed and users are advised to read up carefully on the file I/O functions provided by these languages.


A simple example - the Fibonnaci Sequence generator

To begin with consider, the example of a program which generates a Fibonacci sequence of numbers [x(0),x(1), ... x(n)] , defined by the recursive equation:


x(n) = x(n-1) + x(n-2)   for integer x, n > 1 and x(0) = 0, x(1) = 1

The ratio between two successive terms in the sequence:

r = x(n) / x(n-1)

asymptotically approaches a limit known as the golden ratio for large n. Although this is an extremely simple example, the use of recursion and iteration is central to many algorithms and can be applied to diverse problems in the fields of science, engineering and other disciplines.

A suitable MATLAB M-file function (fib.m) is:

1: function fib

2: max_terms = 10;  

3: if exist( 'output.mat' )
4:   load 'output.mat';
5:   fprintf( 'restarting after eviction\n' );   
6: else 
7:   no_of_terms = 2;
8:   x(1)= 0;
9:   x(2)= 1;
10: end

11: while( no_of_terms < max_terms )
12:   x( no_of_terms + 1 ) = x( no_of_terms ) + x( no_of_terms - 1 );
13:   ratio = x( no_of_terms + 1 ) / x( no_of_terms ); 
14:   no_of_terms = no_of_terms  + 1;
15:   fprintf( 'latest approximation of ratio after %d terms = %fl\n', no_of_terms, ratio );
16:   save 'output.mat';
17:   pause( 10 );
18: end
(The line numbers are for reference only and should not be included in the actual code.)

Initially the output file, output.mat, is absent and the first two terms are initialised in lines 8 and 9 as well as the term count in line 7. The recursion is implemented in the loop in lines 11-18 with new terms calculated (and stored as an array) in line 12. The golden ratio is calculated in line 13. On first encountering this code, the loop looks very much like it would be more suited to a for construction rather than a while construction. The choice of while should become clear shortly.

In line 16, all of the current program state is saved to the file output.mat; this being the checkpoint file from which the program may be restarted. On a multi-GHz processor, the time taken for the program to run is on the microsecond scale and so a delay (arbitrarily set at 10 s) is introduced in line 17 so that the operation may be followed by somewhat slower humans.

If the program is interrupted while running and started again, the checkpoint file will be loaded in line 4 restoring all of the program's workspace. As well as loading the Fibonacci sequence array, the loop index is also restored so that the loop starting at line 11 starts from where it left off rather than the beginning again. This is the reason for using a while loop rather than a for loop.

It is probably worth trying this example to convince yourself that it does infact work properly. The program can be run, then interrupted (use Ctrl+C) and run again whence it should pick up execution from where it left off. To run the program on the Condor pool, a standalone executable will first needs to be built using e.g.


$ matlab_build fib.m

A simplified job description file such as the one below can be used for submitting the standalone executable:

executable=fib.exe
stdout = fib.out
stderr = fib.err
checkpoint_files = output.mat
log = fib.log

(Note that the checkpoint files must be specified otherwise no output files will be returned to the Condor server).

This would be submitted using:

$ matlab_submit fib

In this case, the program is unlikely to be evicted before it completes however, the chances of this can be improved by increasing the delay in line 17. Alternatively, the job can be forcibly evicted using the command

$ condor_vacate_job <jobID>

(Condor, confusingly, seems to use the terms "vacate" and "evict" interchangeably).

You should be able to see from the standard output file, fib.out, that execution picks up from where it left off after an eviction.


A more realistic example - the Conjugate Gradient Method

The Conjugate Gradient Method is an iterative algorithm used to solve systems of equations of the form

Ax = b


where A is a square matrix of dimensions n by n and x and b are n by 1 column vectors. For the method to work, A needs to be symmetric positive-definite (a sufficent - but not necessary - condition is that A can be constructed from the Cholesky factorisation A=LL' where L is lower triangular with strictly positive elements). The method is well suited to large sparse systems resulting from finite difference discretizations and its attractiveness lies in the fact that it will generally converge to a solution far quicker than a direct solver (and with much less need of dynamic memory allocation). In exact arithmetic, convergence is guaranteed in n iterations.

A suitable MATLAB function (cg.m) is:


1: function x = cg(A,b)

2:    tol=1e-10;
    
3:    r = b + A*b;
4:    y = -r;
5:    z = A*y;
6:    s = y'*z;
7:    t = (r'*y)/s;
8:    x = -b + t*y;
  
9:    for k = 1:numel(b);
10:      r = r - t*z;
11:      if( norm(r) < tol )
12:           return;
13:      end

14:      B = (r'*z)/s;
15:      y = -r + B*y;
16:      z = A*y;
17:      s = y'*z;
18:      t = (r'*y)/s;
19:      x = x + t*y;
20:   end

Note that the actual implementation details are unimportant; the salient points of more general applicability are that:

A checkpointing implementation (conjgrad.m) might take the form of:


1: function conjgrad
    
2:    if exist('check.mat')
         % initialise data on restart if checkpoint file exists 

3:       fprintf('restarting after eviction\n');  
4:       load('check.mat');  
5:    else    
         % load co-efft matrix etc from initial input file
       
6:       load('input.mat');

         % initialise CG solver                  

7:        r = b + A*b;
8:        y = -r;
9:        z = A*y;
10:       s = y'*z;
11:       t = (r'*y)/s;
12:       x = -b + t*y;                     
13:       k=1;
14:       save('check.mat');
15:   end 
 
16:   checkpoint_interval_secs = 60;
17:   tol = 1e-10;  
 
18:   t0 = clock;
19:   while( k <= numel(b) )   
20:       r = r - t*z;
21:       if( norm(r) < tol )
22:           fprintf('converged in %d iterations\n', k );                
23:           save('output.mat', 'x');
24:           return;
25:       end
    
26:       B = (r'*z)/s;
27:       y = -r + B*y;
28:       z = A*y;
29:       s = y'*z;
30:       t = (r'*y)/s;
31:       x = x + t*y;
32:       k = k + 1;
     
          % save checkpoint if sufficient time has elapsed 
   
33:       if( etime(clock,t0) >= checkpoint_interval_secs )           
34:           fprintf('iterate = %d \n', k );  
35:           save('check.mat');
( 35a:           pause( 30 ); )
36:           t0 = clock;
37:       end    
38:    end
   

This, at first sight, appears significantly more complicated than the original example but the extra checkpointing code should fall into place quite naturally. The main changes are as follows:

As before, it is worth trying this out interactively before using the Condor pool. A good way of generating the co-efficient matrix is to use the MATLAB gallery function viz:

>> A = gallery('wathen',nx,ny);

This will generate a regularly structured sparse matrix typical of those resulting from finite difference methods. Then to create the input file:
>> [n,n] = size(A);
>> sol = rand(n,1);
>> b = A*sol;
>> save input.mat;

A value of nx = ny ~= 250 should give a run time of around 10 minutes so that it is possible to run the program with interruptions to ensure that the checkpointing works.

Again the M-file can be compiled into a standalone application for use on the Condor pool. A suitable simplified job description file is:


executable=conjgrad.exe
input_files = input.mat
checkpoint_files = output.mat
log = cg.log
stdout = cg.out
stderr = cg.err

This can be extended to clusters of jobs e.g.

executable=conjgrad.exe
indexed_input_files = input.mat
checkpoint_files = output.mat
indexed_log = cg.log
indexed_stdout = cg.out
indexed_stderr = cg.err
total_jobs = 2
(In this case two input files (input0.mat and input1.mat) files would be needed). Note that the checkpoint files do not need to be indexed as Condor will automatically prevent any overwriting.

As before, testing can be performed by using condor_vacate_job to forcibly evict the job - although you will need to be quick ! Alternatively a delay can be incorporated as in line 33a.


Behind the scenes

The previous discussion has rather skirted around how Condor implements the checkpoint/restart process and it is worth describing what is happening behind the scenes to avoid any confusion. The first point to bear in mind - which is certainly not obvious - is that Condor does not return the checkpoint files to the directory from which the job was submitted in general. Instead, on each eviction, Condor writes the files to a directory under the spool area. This directory will have a name of the form:


/condor_scratch/submit/spool/Q<n>/<X>/<Y>/cluster<X>.proc<Y>.subproc0

Where <X> is the cluster ID and <Y> the process ID. For example if a job had the ID 123.4 then its checkpoint files would be stored under:

/condor_scratch/submit/spool/Q<n>/123/4/cluster123.proc4.subproc0

Where Q<n> is your scheduler queue number (you can find this at the top of the condor_q listing).

The output files from the job are only ever sent to the original job submission directory when the job terminates successfully. When the job is restarted, some or all of the files in the spool area are sent to the execute host as well as all of the original input files from the job submission directory.

This may seem overly complicated but infact this operation is actually extremely useful. Checkpoint files do not need to be given unique names as they are placed in separate directories and there is no danger of overwriting. In addition, it is possible to find out the current state of any job by evicting it using condor_vacate_job. For long running jobs this can provide a useful sanity check that the results are being generated correctly.

It may not be immediately clear which files are transferred to the spool directory on eviction. To avoid confusion, in the simplified job description file, use this attribute to list the checkpoint files:

checkpoint_files = ...
If you are using the ordinary job submission (".sub") files, then include the list of checkpoint files in with the list of ordinary output files. The job submission file will also need to include this line:
+CheckpointJob = TRUE
(this isn't a typo - the plus at the beginning is needed).

A safety net can be provided to prevent jobs from running for too long without returning a checkpoint (and thus running the risk of being killed immediately). This is achieved by adding the max_run_time attribute to the simplified job submission file. For example:
max_run_time = 480
Would ensure that no job will run for more than eight hours (480 minutes) without being evicted and returning a checkpoint. This would allow jobs to run overnight without interruption and still return a checkpoint before the PCs are used again during office hours.


The devil in the detail

There are a number of fairly subtle points to look at when implementing checkpointing. The first concerns the frequency at which checkpoints are saved. In the Conjugate Gradient example, a checkpoint could have been saved on each loop iteration but this would likely slow things down to an unacceptable degree so that the checkpoint only occurs every minute. Obviously there is a trade-off between the amount of work lost when an eviction happens and the extra overhead in saving the checkpoint files. This will vary from application to application and it is difficult to provide one-size-fits-all guidlines. It is worth pointing out though that the MATLAB profiler provides an excellent way of finding out where in the code programs are spending most their time.

Another point concerns the amount of state information stored by each checkpoint. In the Conjugate Gradient example, all of the workspace is saved by each checkpoint operation although only a subset of it is actually needed. In particular, the co-efficient matrix, A, need not be saved each time and could be loaded from the initital input file when the job restarts. By minimising those variables which are saved in each checkpoint, the overhead can be reduced leading to more efficient execution.

A final point to consider is the length of time taken for the checkpoint file to be written. In the examples here, it has been tacitly assumed that the file is saved sufficiently quicky for the possibility of the job being evicted while it is being written to be safely ignored. If large amounts of data need to be saved, this assumption may not hold and it may be necessary to save the data to a temporary file then update the checkpoint file only at the last moment. For example by using:

 
save( 'checktmp.mat' );
system( 'move /y checktmp.mat check.mat' );

(The /y option forces the move without prompting first.)

The operating system should guarantee that the move command is "atomic" (in the sense that it is indivisible i.e. it succeeds completely or not at all) so that there is no danger of receiving a corrupt "half-written" checkpoint file from the job.

The final version of the solver (cgfull.m), which takes account of the above points, is given below:

1:  function cgfull
   
2:     % load co-efft matrix etc from initial input file
       
3:     load('input.mat');
     
4:     if exist('check.mat')
           % initialise data on restart if checkpoint file exists 

5:         fprintf('restarting after eviction\n');  
6:         load('check.mat');  
7:     else    
           % initialise CG solver                  

8:         r = b + A*b;
9:         y = -r;
10:        z = A*y;
11:        s = y'*z;
12:        t = (r'*y)/s;
13:        x = -b + t*y;                     
14:        k=1;
15:        save('checktmp.mat', 't', 'x', 's', 'z', 'k', 'r', 'y' );
16:        system( 'move /y checktmp.mat check.mat' );
17:    end 
 
18:    checkpoint_interval_secs = 60;
19:    tol = 1e-10;  
 
20:    t0 = clock;
21:    while( k <= numel(b) )   
22:        r = r - t*z;
23:        if( norm(r) < tol )
24:           fprintf('converged in %d iterations\n', k );               
25:           save('output.mat', 'x');
26:           return;
27:        end
    
28:        B = (r'*z)/s;
29:        y = -r + B*y;
30:        z = A*y;
31:        s = y'*z;
32:        t = (r'*y)/s;
33:        x = x + t*y;
34:        k = k + 1;
     
           % save checkpoint if sufficient time has elapsed 
   
35:        if( etime(clock,t0) >= checkpoint_interval_secs )           
36:            fprintf('iterate = %d \n', k );  
37:            save('checktmp.mat', 't', 'x', 's', 'z', 'k', 'r', 'y' );
38:            system( 'move /y checktmp.mat check.mat' );
(  38a:            %pause( 30 );                                )
39:            t0 = clock;
40:        end    
41:     end

The main points to pick out here are:

A suitable simplified job description file is
executable = cgfull.exe 
indexed_stdout = stdout
indexed_stderr = stderr
indexed_log = log
indexed_input_files = input.mat
indexed_output_files = output.mat
checkpoint_files = check.mat
total_jobs = 2
The checkpoint file(s) are specified in a separate attribute to the rest of the output files even though they are themselves effectively output files. It is essential that the checkpoint_files attribute is used for this as it will ensure that it the checkpoints are transferred back to server on eviction (the default is not to transfer files on eviction). Although the checkpoint filename is the same for all jobs, the internal workings of Condor ensure that there is no overwriting of checkpoint files when jobs are evicted since they are stored in different directories. The corresponding Condor job description file is:
universe = vanilla
executable = cgfull.bat 
arguments = cgfull.exe $(PROCESS) input.mat output.mat check.mat
output = stdout$(PROCESS)
error = stderr$(PROCESS)
log = log$(PROCESS)
transfer_input_files = cgfull.exe.manifest, cgfull.exe, input$(PROCESS).mat, \
                       /opt1/condor/apps/matlab/index.exe,   \
                       /opt1/condor/apps/matlab/unindex.exe, \
                       /opt1/condor/apps/matlab/dummy.exe 
transfer_output_files = output$(PROCESS).mat, check.mat
should_transfer_files = YES
when_to_transfer_output = ON_EXIT_OR_EVICT
+CheckpointJob = True
requirements = ( Arch=="X86_64") && ( OpSys=="WINDOWS" )
notification = never
queue 2
The cgfull.bat file is DOS batch file wrapper which performs the indexing/unindexing of files and is created by the matlab_submit command. If you are not using MATLAB there is a template for this batch file in:

/opt1/condor/apps/matlab/wrapper.bat

The other executables needed (index.exe, unindex.exe and dummy.exe) are also in this directory.