Support for Long Running Jobs via User-level Checkpointing
Note: all of the examples can be found on the Condor server under
/opt1/condor/examples/checkpoint
Introduction
One of the main drawbacks of the current Condor pool arrangement is that if jobs are running when a user logs into a PC then the job may be killed if there is significant local use of the PC. Normally this will result in any output from the Condor job being lost. It could be the case that a job has been running most of the night and yet the computation is in vain as the job is evicted the following morning. Obviously this limits the throughput of longer running jobs (and here "longer" could mean more than around twenty minutes) and is wasteful of electricity. A way of improving the situation is possible though a process known as user-level checkpointing [here "user-level" is in the sense that the user needs to do the work rather than leaving it to some other program].
A checkpoint contains the frozen state of the program such that the program may be restarted (possibly on a different machine) from the point at which it was terminated so that little (ideally zero) computational effort is lost. In the so-called "standard" Condor universe, it is possible to link application codes against a Condor checkpointing library which makes the checkpointing process completely transparent. Unfortunately this is only available on UNIX-based pools and cannot be used with the Windows-based ARC Condor pool.
All is not lost though. Even when using the so-called "vanilla" Condor universe on Windows, it is still possible to make use of checkpointing provided that users include their own checkpointing code in their applications (clearly, this limits its use to applications where the source code is available and can be modified relatively easily). A couple of examples are presented here using MATLAB code which may be used as templates for other applications. The reason for choosing MATLAB is that saving and loading data using MATLAB's MAT-format files is extremely simple. This means that checkpointing can be implemented relatively easily. For C or FORTRAN codes (for example), more coding effort is needed and users are advised to read up carefully on the file I/O functions provided by these languages.
A simple example - the Fibonnaci Sequence generator
To begin with consider, the example of a program which generates a Fibonacci sequence of numbers [x(0),x(1), ... x(n)] , defined by the recursive equation:
x(n) = x(n-1) + x(n-2) for integer x, n > 1 and x(0) = 0, x(1) = 1The ratio between two successive terms in the sequence:
r = x(n) / x(n-1)asymptotically approaches a limit known as the golden ratio for large n. Although this is an extremely simple example, the use of recursion and iteration is central to many algorithms and can be applied to diverse problems in the fields of science, engineering and other disciplines.
A suitable MATLAB M-file function (fib.m) is:
1: function fib 2: max_terms = 10; 3: if exist( 'output.mat' ) 4: load 'output.mat'; 5: fprintf( 'restarting after eviction\n' ); 6: else 7: no_of_terms = 2; 8: x(1)= 0; 9: x(2)= 1; 10: end 11: while( no_of_terms < max_terms ) 12: x( no_of_terms + 1 ) = x( no_of_terms ) + x( no_of_terms - 1 ); 13: ratio = x( no_of_terms + 1 ) / x( no_of_terms ); 14: no_of_terms = no_of_terms + 1; 15: fprintf( 'latest approximation of ratio after %d terms = %fl\n', no_of_terms, ratio ); 16: save 'output.mat'; 17: pause( 10 ); 18: end(The line numbers are for reference only and should not be included in the actual code.)
Initially the output file, output.mat, is absent and the first two terms are initialised in lines 8 and 9 as well as the term count in line 7. The recursion is implemented in the loop in lines 11-18 with new terms calculated (and stored as an array) in line 12. The golden ratio is calculated in line 13. On first encountering this code, the loop looks very much like it would be more suited to a for construction rather than a while construction. The choice of while should become clear shortly.
In line 16, all of the current program state is saved to the file output.mat; this being the checkpoint file from which the program may be restarted. On a multi-GHz processor, the time taken for the program to run is on the microsecond scale and so a delay (arbitrarily set at 10 s) is introduced in line 17 so that the operation may be followed by somewhat slower humans.
If the program is interrupted while running and started again, the checkpoint file will be loaded in line 4 restoring all of the program's workspace. As well as loading the Fibonacci sequence array, the loop index is also restored so that the loop starting at line 11 starts from where it left off rather than the beginning again. This is the reason for using a while loop rather than a for loop.
It is probably worth trying this example to convince yourself that it does infact work properly. The program can be run, then interrupted (use Ctrl+C) and run again whence it should pick up execution from where it left off. To run the program on the Condor pool, a standalone executable will first needs to be built using e.g.
$ matlab_build fib.mA simplified job description file such as the one below can be used for submitting the standalone executable:
executable=fib.exe stdout = fib.out stderr = fib.err checkpoint_files = output.mat log = fib.log(Note that the checkpoint files must be specified otherwise no output files will be returned to the Condor server).
This would be submitted using:
$ matlab_submit fibIn this case, the program is unlikely to be evicted before it completes however, the chances of this can be improved by increasing the delay in line 17. Alternatively, the job can be forcibly evicted using the command
$ condor_vacate_job <jobID>(Condor, confusingly, seems to use the terms "vacate" and "evict" interchangeably).
You should be able to see from the standard output file, fib.out, that execution picks up from where it left off after an eviction.
A more realistic example - the Conjugate Gradient Method
The Conjugate Gradient Method is an iterative algorithm used to solve
systems of equations of the form
where A is a square matrix of dimensions n by n and x and b are n by 1 column vectors. For the method to work, A needs to be symmetric positive-definite (a sufficent - but not necessary - condition is that A can be constructed from the Cholesky factorisation A=LL' where L is lower triangular with strictly positive elements). The method is well suited to large sparse systems resulting from finite difference discretizations and its attractiveness lies in the fact that it will generally converge to a solution far quicker than a direct solver (and with much less need of dynamic memory allocation). In exact arithmetic, convergence is guaranteed in n iterations.
A suitable MATLAB function (cg.m) is:
1: function x = cg(A,b) 2: tol=1e-10; 3: r = b + A*b; 4: y = -r; 5: z = A*y; 6: s = y'*z; 7: t = (r'*y)/s; 8: x = -b + t*y; 9: for k = 1:numel(b); 10: r = r - t*z; 11: if( norm(r) < tol ) 12: return; 13: end 14: B = (r'*z)/s; 15: y = -r + B*y; 16: z = A*y; 17: s = y'*z; 18: t = (r'*y)/s; 19: x = x + t*y; 20: end
Note that the actual implementation details are unimportant; the salient points of more general applicability are that:
- Lines 3-8 perform the initialisation
- The recursion at the heart of the algorithm is performed by the (for) loop in lines 9 - 20
- The latest estimate of the solution vector, x, is updated recursively on each iteration of the loop in line 19
- The convergence test is applied in lines 10-13 which, when met, causes an immediate exit from the main loop and the function.
A checkpointing implementation (conjgrad.m) might take the form of:
1: function conjgrad 2: if exist('check.mat') % initialise data on restart if checkpoint file exists 3: fprintf('restarting after eviction\n'); 4: load('check.mat'); 5: else % load co-efft matrix etc from initial input file 6: load('input.mat'); % initialise CG solver 7: r = b + A*b; 8: y = -r; 9: z = A*y; 10: s = y'*z; 11: t = (r'*y)/s; 12: x = -b + t*y; 13: k=1; 14: save('check.mat'); 15: end 16: checkpoint_interval_secs = 60; 17: tol = 1e-10; 18: t0 = clock; 19: while( k <= numel(b) ) 20: r = r - t*z; 21: if( norm(r) < tol ) 22: fprintf('converged in %d iterations\n', k ); 23: save('output.mat', 'x'); 24: return; 25: end 26: B = (r'*z)/s; 27: y = -r + B*y; 28: z = A*y; 29: s = y'*z; 30: t = (r'*y)/s; 31: x = x + t*y; 32: k = k + 1; % save checkpoint if sufficient time has elapsed 33: if( etime(clock,t0) >= checkpoint_interval_secs ) 34: fprintf('iterate = %d \n', k ); 35: save('check.mat'); ( 35a: pause( 30 ); ) 36: t0 = clock; 37: end 38: end
This, at first sight, appears significantly more complicated than the original example but the extra checkpointing code should fall into place quite naturally. The main changes are as follows:
-
The input parameters are read from an input file in line 6 rather than being passed
as function parameters (these are the co-efficient matrix, A and the right-hand side,
b).
-
The original for loop has been replaced by a while loop in lines 19-38 in the same
manner as the Fibonacci example.
-
Inside the main loop, the current program state is saved to a checkpoint file,
check.mat, at repeated intervals rather than on each iteration (as with the Fibonacci
sequence example) at line 35.
In this example, the co-efficient matrix, A, is typically extremely sparse so that the
additional cost of saving it to a file is likely to be relatively small (generally
it will contain around the same number of non-zero elements as the vectors -
to an order of magnitude). Where programs deal with large (possibly
multi-dimensional) dense arrays of data (such as those derived from MRI/CT) it is worth
considering the impact on efficiency of saving
the entire workspace each time. In this case, there will be
considerable overhead and it may be more sensible to reload the data from
the initial input file(s) rather than the checkpoint file.
-
The checkpoint file is loaded (if present) in line 4 and reinitialises all of the variables.
-
In line 14, a checkpoint is saved after the initialisation to counter the
possibility of the program being interrupted before the checkpoint at
line 35 (this is unlikely but nonetheless possible).
>> A = gallery('wathen',nx,ny);This will generate a regularly structured sparse matrix typical of those resulting from finite difference methods. Then to create the input file:
>> [n,n] = size(A); >> sol = rand(n,1); >> b = A*sol; >> save input.mat;
A value of nx = ny ~= 250 should give a run time of around 10 minutes so that it is possible to run the program with interruptions to ensure that the checkpointing works.
Again the M-file can be compiled into a standalone application for use on the Condor pool. A suitable simplified job description file is:
executable=conjgrad.exe input_files = input.mat checkpoint_files = output.mat log = cg.log stdout = cg.out stderr = cg.errThis can be extended to clusters of jobs e.g.
executable=conjgrad.exe indexed_input_files = input.mat checkpoint_files = output.mat indexed_log = cg.log indexed_stdout = cg.out indexed_stderr = cg.err total_jobs = 2(In this case two input files (input0.mat and input1.mat) files would be needed). Note that the checkpoint files do not need to be indexed as Condor will automatically prevent any overwriting.
As before, testing can be performed by using condor_vacate_job to forcibly evict the job - although you will need to be quick ! Alternatively a delay can be incorporated as in line 33a.
Behind the scenes
The previous discussion has rather skirted around how Condor implements the checkpoint/restart process and it is worth describing what is happening behind the scenes to avoid any confusion. The first point to bear in mind - which is certainly not obvious - is that Condor does not return the checkpoint files to the directory from which the job was submitted in general. Instead, on each eviction, Condor writes the files to a directory under the spool area. This directory will have a name of the form:
/condor_scratch/submit/spool/Q<n>/<X>/<Y>/cluster<X>.proc<Y>.subproc0
Where <X> is the cluster ID and <Y> the process ID. For example if a job had the ID 123.4 then its checkpoint files would be stored under:
/condor_scratch/submit/spool/Q<n>/123/4/cluster123.proc4.subproc0
Where Q<n> is your scheduler queue number (you can find this at the top of the condor_q listing).
The output files from the job are only ever sent to the original job submission directory when the job terminates successfully. When the job is restarted, some or all of the files in the spool area are sent to the execute host as well as all of the original input files from the job submission directory.
This may seem overly complicated but infact this operation is actually extremely useful. Checkpoint files do not need to be given unique names as they are placed in separate directories and there is no danger of overwriting. In addition, it is possible to find out the current state of any job by evicting it using condor_vacate_job. For long running jobs this can provide a useful sanity check that the results are being generated correctly.
It may not be immediately clear which files are transferred to the spool directory on eviction. To avoid confusion, in the simplified job description file, use this attribute to list the checkpoint files:
checkpoint_files = ...If you are using the ordinary job submission (".sub") files, then include the list of checkpoint files in with the list of ordinary output files. The job submission file will also need to include this line:
+CheckpointJob = TRUE(this isn't a typo - the plus at the beginning is needed). A safety net can be provided to prevent jobs from running for too long without returning a checkpoint (and thus running the risk of being killed immediately). This is achieved by adding the max_run_time attribute to the simplified job submission file. For example:
max_run_time = 480Would ensure that no job will run for more than eight hours (480 minutes) without being evicted and returning a checkpoint. This would allow jobs to run overnight without interruption and still return a checkpoint before the PCs are used again during office hours.
The devil in the detail
There are a number of fairly subtle points to look at when implementing checkpointing. The first concerns the frequency at which checkpoints are saved. In the Conjugate Gradient example, a checkpoint could have been saved on each loop iteration but this would likely slow things down to an unacceptable degree so that the checkpoint only occurs every minute. Obviously there is a trade-off between the amount of work lost when an eviction happens and the extra overhead in saving the checkpoint files. This will vary from application to application and it is difficult to provide one-size-fits-all guidlines. It is worth pointing out though that the MATLAB profiler provides an excellent way of finding out where in the code programs are spending most their time.
Another point concerns the amount of state information stored by each checkpoint. In the Conjugate Gradient example, all of the workspace is saved by each checkpoint operation although only a subset of it is actually needed. In particular, the co-efficient matrix, A, need not be saved each time and could be loaded from the initital input file when the job restarts. By minimising those variables which are saved in each checkpoint, the overhead can be reduced leading to more efficient execution.
A final point to consider is the length of time taken for the checkpoint file to be written. In the examples here, it has been tacitly assumed that the file is saved sufficiently quicky for the possibility of the job being evicted while it is being written to be safely ignored. If large amounts of data need to be saved, this assumption may not hold and it may be necessary to save the data to a temporary file then update the checkpoint file only at the last moment. For example by using:
save( 'checktmp.mat' ); system( 'move /y checktmp.mat check.mat' );(The /y option forces the move without prompting first.)
The operating system should guarantee that the move command is "atomic" (in the sense that it is indivisible i.e. it succeeds completely or not at all) so that there is no danger of receiving a corrupt "half-written" checkpoint file from the job.
The final version of the solver (cgfull.m), which takes account of the above points, is given below:
1: function cgfull 2: % load co-efft matrix etc from initial input file 3: load('input.mat'); 4: if exist('check.mat') % initialise data on restart if checkpoint file exists 5: fprintf('restarting after eviction\n'); 6: load('check.mat'); 7: else % initialise CG solver 8: r = b + A*b; 9: y = -r; 10: z = A*y; 11: s = y'*z; 12: t = (r'*y)/s; 13: x = -b + t*y; 14: k=1; 15: save('checktmp.mat', 't', 'x', 's', 'z', 'k', 'r', 'y' ); 16: system( 'move /y checktmp.mat check.mat' ); 17: end 18: checkpoint_interval_secs = 60; 19: tol = 1e-10; 20: t0 = clock; 21: while( k <= numel(b) ) 22: r = r - t*z; 23: if( norm(r) < tol ) 24: fprintf('converged in %d iterations\n', k ); 25: save('output.mat', 'x'); 26: return; 27: end 28: B = (r'*z)/s; 29: y = -r + B*y; 30: z = A*y; 31: s = y'*z; 32: t = (r'*y)/s; 33: x = x + t*y; 34: k = k + 1; % save checkpoint if sufficient time has elapsed 35: if( etime(clock,t0) >= checkpoint_interval_secs ) 36: fprintf('iterate = %d \n', k ); 37: save('checktmp.mat', 't', 'x', 's', 'z', 'k', 'r', 'y' ); 38: system( 'move /y checktmp.mat check.mat' ); ( 38a: %pause( 30 ); ) 39: t0 = clock; 40: end 41: end
The main points to pick out here are:
- Only those variables which are altered by the program are saved in the checkpoints at lines 15 and 37.
- Lines 15-16 and 37-38 guard against the possibility of the program being killed while saving a checkpoint to file.
- Those variables which are the same on each iteration (in this case just the co-efficient matrix, A, and the right-hand side, b) are loaded on each restart after an eviction in line 3.
executable = cgfull.exe indexed_stdout = stdout indexed_stderr = stderr indexed_log = log indexed_input_files = input.mat indexed_output_files = output.mat checkpoint_files = check.mat total_jobs = 2The checkpoint file(s) are specified in a separate attribute to the rest of the output files even though they are themselves effectively output files. It is essential that the checkpoint_files attribute is used for this as it will ensure that it the checkpoints are transferred back to server on eviction (the default is not to transfer files on eviction). Although the checkpoint filename is the same for all jobs, the internal workings of Condor ensure that there is no overwriting of checkpoint files when jobs are evicted since they are stored in different directories. The corresponding Condor job description file is:
universe = vanilla executable = cgfull.bat arguments = cgfull.exe $(PROCESS) input.mat output.mat check.mat output = stdout$(PROCESS) error = stderr$(PROCESS) log = log$(PROCESS) transfer_input_files = cgfull.exe.manifest, cgfull.exe, input$(PROCESS).mat, \ /opt1/condor/apps/matlab/index.exe, \ /opt1/condor/apps/matlab/unindex.exe, \ /opt1/condor/apps/matlab/dummy.exe transfer_output_files = output$(PROCESS).mat, check.mat should_transfer_files = YES when_to_transfer_output = ON_EXIT_OR_EVICT +CheckpointJob = True requirements = ( Arch=="X86_64") && ( OpSys=="WINDOWS" ) notification = never queue 2The cgfull.bat file is DOS batch file wrapper which performs the indexing/unindexing of files and is created by the matlab_submit command. If you are not using MATLAB there is a template for this batch file in:
/opt1/condor/apps/matlab/wrapper.bat
The other executables needed (index.exe, unindex.exe and dummy.exe) are also in this directory.