I recently came across the need of adding the ability of executing asynchronous tasks in a project. The project in question had these needs:

  • Ability to execute transactional tasks in the background (like emailing a generated report to a user).
  • Ability to schedule a task so it doesn’t get executed sooner than the scheduled date.
  • Tasks execute transactional DB operations, so if the transaction fails, they should retry X times before failing.
  • Reliable execution: if a task is scheduled, it is guaranteed to be executed, even if the server fails or the thread executing it dies.
  • Also, the app is already using a DB for data persistence, so it will be preferable to also use it to store the tasks queue instead of requiring extra infrastructure to manage.
  • No need for a JavaEE application server, Tomcat or Jetty should suffice.

As additional requirement, the project size didn’t justify using Quartz or JMS, as they added too much complexity and dependencies to solve a problem that only requires a small fraction of the functionality these solutions provide.

So this just left me with the help of Spring. Spring has support for scheduling and task execution, but the provided executors either rely on plain threads/timers or need Quartz. Plain threads/timers are fine for almost all of the needs, but they don’t cover reliable execution, so if for example the server is rebooted, your tasks would be lost (JavaEE timers can be made persistent, but the project’s target environment was Tomcat).

So building on Spring task executing capabilities, this solution will add persistent storage to the tasks to ensure reliability in their execution.

Initial requirements

For this solution to work, you just need to have Spring 3 with working declarative transactions (@Transactional). I’m using JPA2 for persistence and optimistic locking to ensure data integrity. If you are using different technologies, adapting the solution should be just a manner of changing exception catching and modifying the DAOs.

Configuring Spring

As I said earlier, this solution builds on Spring’s task executing capabilities. This means that I will use Spring to manage the thread pool needed to manage the asynchronous execution of methods marked by @Scheduled. Then, in those methods I will add the necessary logic to manage the actual task execution.

Assuming you have the task schema added to your configuration file, these two lines are the only configuration required to create a thread pool of 10 threads and configure Spring to use that pool to run the annotated methods.

    <!-- A task scheduler that will call @Scheduled methods -->
    <task:scheduler id="myScheduler" pool-size="10"/>
    <task:annotation-driven scheduler="myScheduler"/>

A holder class to store the tasks in a queue

Tasks need to be persisted, and in their persisted status they need to carry some extra information to be able to correctly execute them. So each enqueued task will store this:

  • Creation time-stamp: the moment when the task was initially queued for execution.
  • Triggering time-stamp: a task cannot be executed sooner than this.
  • Started time-stamp: the exact moment when a thread starts executing this task.
  • Completed time-stamp: when a task is successfully completed, this gets filled. Along with the started time-stamp, this allows the executor to detect stalled or dead tasks.
  • Serialized task: the actual task.

My JPA2 entity is as follows:

/**
 * Persistent entity that stores an async task.
 * 
 * @author Carlos Vara
 */
@Entity
@Table(name="TASK_QUEUE")
public class QueuedTaskHolder {
 
    // Getters -----------------------------------------------------------------
 
    @Id
    @MyAppId
    public String getId() {
        if ( this.id == null ) {
            this.setId(UUIDHelper.newUUID());
        }
        return this.id;
    }
 
    @NotNull
    @Past
    @Temporal(TemporalType.TIMESTAMP)
    public Calendar getCreationStamp() {
        return this.creationStamp;
    }
 
    @Temporal(TemporalType.TIMESTAMP)
    public Calendar getTriggerStamp() {
        return triggerStamp;
    }
 
    @Past
    @Temporal(TemporalType.TIMESTAMP)
    public Calendar getStartedStamp() {
        return this.startedStamp;
    }
 
    @Past
    @Temporal(TemporalType.TIMESTAMP))
    public Calendar getCompletedStamp() {
        return this.completedStamp;
    }
 
    @Lob
    @NotNull
    public byte[] getSerializedTask() {
        return this.serializedTask;
    }
 
    @Version
    protected int getVersion() {
        return this.version;
    }
 
 
    // Setters -----------------------------------------------------------------
 
    protected void setId(String id) {
        this.id = id;
    }
 
    public void setCreationStamp(Calendar creationStamp) {
        this.creationStamp = creationStamp;
    }
 
    public void setTriggerStamp(Calendar triggerStamp) {
        this.triggerStamp = triggerStamp;
    }
 
    public void setStartedStamp(Calendar startedStamp) {
        this.startedStamp = startedStamp;
    }
 
    public void setCompletedStamp(Calendar completedStamp) {
        this.completedStamp = completedStamp;
    }
 
    public void setSerializedTask(byte[] serializedTask) {
        this.serializedTask = serializedTask;
    }
 
    public void setVersion(int version) {
        this.version = version;
    }
 
 
    // Fields ------------------------------------------------------------------
 
    private String id;
    private Calendar creationStamp;
    private Calendar triggerStamp = null;
    private Calendar startedStamp = null;
    private Calendar completedStamp = null;
    private byte[] serializedTask;
    private int version;
 
 
    // Lifecycle events --------------------------------------------------------
 
    @SuppressWarnings("unused")
    @PrePersist
    private void onAbstractBaseEntityPrePersist() {
        this.ensureId();
        this.markCreation();
    }
 
    /**
     * Ensures that the entity has a unique UUID.
     */
    private void ensureId() {
        this.getId();
    }
 
    /**
     * Sets the creation stamp to now.
     */
    private void markCreation() {
        setCreationStamp(Calendar.getInstance(TimeZone.getTimeZone("Etc/UTC")));
    }
 
 
    @Override
    public String toString() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss z");
        return new ToStringCreator(this).append("id", getId())
            .append("creationStamp", (getCreationStamp()!=null)?sdf.format(getCreationStamp().getTime()):null)
            .append("startedStamp", (getStartedStamp()!=null)?sdf.format(getStartedStamp().getTime()):null)
            .append("completedStamp", (getCompletedStamp()!=null)?sdf.format(getCompletedStamp().getTime()):null)
            .toString();
    }    
 
}

A DAO to retrieve tasks from the queue

The executor will do 3 things: enqueue new tasks, get tasks from the queue and execute them, and re-queue tasks that are suspected to be stalled (usually because their executing thread has died). So the DAO has to provide operations to cover those scenarios.

The interface that defines this DAO:

/**
 * DAO operations for the {@link QueuedTaskHolder} entities.
 * 
 * @author Carlos Vara
 */
public interface QueuedTaskHolderDao {
 
    /**
     * Adds a new task to the current persistence context. The task will be
     * persisted into the database at flush/commit.
     * 
     * @param queuedTask
     *            The task to be saved (enqueued).
     */
    void persist(QueuedTaskHolder queuedTask);
 
 
    /**
     * Finder that retrieves a task by its id.
     * 
     * @param taskId
     *            The id of the requested task.
     * @return The task with that id, or <code>null</code> if no such task
     *         exists.
     */
    QueuedTaskHolder findById(String taskId);
 
 
    /**
     * @return A task which is candidate for execution. The receiving thread
     *         will need to ensure a lock on it. <code>null</code> if no
     *         candidate task is available.
     */
    QueuedTaskHolder findNextTaskForExecution();
 
 
    /**
     * @return A task which has been in execution for too long without
     *         finishing. <code>null</code> if there aren't stalled tasks.
     */
    QueuedTaskHolder findRandomStalledTask();
 
}

And my JPA2 implementation (I’m using the new typesafe criteria query):

/**
 * JPA2 implementation of {@link QueuedTaskHolderDao}.
 * 
 * @author Carlos Vara
 */
@Repository
public class QueuedTaskHolderDaoJPA2 implements QueuedTaskHolderDao {
 
 
    // QueuedTaskDao methods ---------------------------------------------------
 
    @Override
    public void persist(QueuedTaskHolder queuedTask) {
        this.entityManager.persist(queuedTask);
    }
 
    @Override
    public QueuedTaskHolder findById(String taskId) {
        return this.entityManager.find(QueuedTaskHolder.class, taskId);
    }
 
    @Override
    public QueuedTaskHolder findNextTaskForExecution() {
 
        Calendar NOW = Calendar.getInstance();
 
        // select qt from QueuedTask where
        //      qt.startedStamp == null AND
        //      (qth.triggerStamp == null || qth.triggerStamp < NOW)
        // order by qth.version ASC, qt.creationStamp ASC
        CriteriaBuilder cb = this.entityManager.getCriteriaBuilder();
        CriteriaQuery<QueuedTaskHolder> cq = cb.createQuery(QueuedTaskHolder.class);
        Root<QueuedTaskHolder> qth = cq.from(QueuedTaskHolder.class);
        cq.select(qth)
            .where(cb.and(cb.isNull(qth.get(QueuedTaskHolder_.startedStamp)), 
                    cb.or(
                            cb.isNull(qth.get(QueuedTaskHolder_.triggerStamp)),
                            cb.lessThan(qth.get(QueuedTaskHolder_.triggerStamp), NOW))))
            .orderBy(cb.asc(qth.get(QueuedTaskHolder_.version)), cb.asc(qth.get(QueuedTaskHolder_.creationStamp)));
 
        List<QueuedTaskHolder> results = this.entityManager.createQuery(cq).setMaxResults(1).getResultList();
        if ( results.isEmpty() ) {
            return null;
        }
        else {
            return results.get();
        }
 
    }
 
    @Override
    public QueuedTaskHolder findRandomStalledTask() {
 
        Calendar TOO_LONG_AGO = Calendar.getInstance();
        TOO_LONG_AGO.add(Calendar.SECOND, -7200);
 
        // select qth from QueuedTask where 
        //      qth.startedStamp != null AND
        //      qth.startedStamp < TOO_LONG_AGO
        CriteriaBuilder cb = this.entityManager.getCriteriaBuilder();
        CriteriaQuery<QueuedTaskHolder> cq = cb.createQuery(QueuedTaskHolder.class);
        Root<QueuedTaskHolder> qth = cq.from(QueuedTaskHolder.class);
        cq.select(qth).where(
                cb.and(
                        cb.isNull(qth.get(QueuedTaskHolder_.completedStamp)),
                        cb.lessThan(qth.get(QueuedTaskHolder_.startedStamp), TOO_LONG_AGO)));
 
        List<QueuedTaskHolder> stalledTasks = this.entityManager.createQuery(cq).getResultList();
 
        if ( stalledTasks.isEmpty() ) {
            return null;
        }
        else {
            Random rand = new Random(System.currentTimeMillis());
            return stalledTasks.get(rand.nextInt(stalledTasks.size()));
        }
 
    }
 
 
    // Injected dependencies ---------------------------------------------------
 
    @PersistenceContext
    private EntityManager entityManager;
 
}

As it can be seen in the implementation, the “definitions” for a stalled task and the priorities given to the tasks in the queue can be easily tweaked in case it’s needed.

Currently, tasks can be retrieved from the queue as soon as their triggering stamp is reached, and they are ordered by the amount of times they have been tried to execute (a trick using the version column) and by how old they are. It’s easy to add an extra condition for example to never query tasks that have failed too many times.

The executor

Now, the most important piece in the system. The executor will:

  • Enqueue (persist) tasks received.
  • Retrieve tasks that need to be executed. Ensure that the current thread gets a proper lock on the task so it’s the only one attempting its execution.
  • Check for stalled tasks and re-queue them.

The first operation is synchronous, and in my scenario gets executed in the same transaction as the operation that requests the task execution, this way, if for whatever reason the current transaction fails, no spurious tasks are queued.

The other two operations are asynchronous and their execution is managed by the thread pool that was configured in the first step. The rate of execution can be adjusted depending on the amount of tasks that your system needs to manage. Also, these methods will execute/re-queue as many tasks as they can while they have work to do, so there is no need for setting the rates too high.

The executor implements Spring’s TaskExecutor interface, so it can be easily substituted by another implementation should the need for it arise.

/**
 * A task executor with persistent task queueing.
 * 
 * @author Carlos Vara
 */
@Component("MyTaskExecutor")
public class MyTaskExecutor implements TaskExecutor {
 
    final static Logger logger = LoggerFactory.getLogger(MyTaskExecutor.class);
 
    @Autowired
    protected QueuedTaskHolderDao queuedTaskDao;
 
    @Autowired
    protected Serializer serializer;
 
 
    /**
     * Additional requirement: must be run inside a transaction.
     * Currently using MANDATORY as the app won't create tasks outside a
     * transaction.
     * 
     * @see org.springframework.core.task.TaskExecutor#execute(java.lang.Runnable)
     */
    @Override
    @Transactional(propagation=Propagation.MANDATORY)
    public void execute(Runnable task) {
 
        logger.debug("Trying to enqueue: {}", task);
 
        AbstractBaseTask abt; 
        try {
            abt = AbstractBaseTask.class.cast(task);
        } catch (ClassCastException e) {
            logger.error("Only runnables that extend AbstractBaseTask are accepted.");
            throw new IllegalArgumentException("Invalid task: " + task);
        }
 
        // Serialize the task
        QueuedTaskHolder newTask = new QueuedTaskHolder();
        byte[] serializedTask = this.serializer.serializeObject(abt);
        newTask.setTriggerStamp(abt.getTriggerStamp());
 
        logger.debug("New serialized task takes {} bytes", serializedTask.length);
 
        newTask.setSerializedTask(serializedTask);
 
        // Store it in the db
        this.queuedTaskDao.persist(newTask);
 
        // POST: Task has been enqueued
    }
 
 
    /**
     * Runs enqueued tasks.
     */
    @Scheduled(fixedRate=60l*1000l) // Every minute
    public void runner() {
 
        logger.debug("Started runner {}", Thread.currentThread().getName());
 
        QueuedTaskHolder lockedTask = null;
 
        // While there is work to do...
        while ( (lockedTask = tryLockTask()) != null ) {
 
            logger.debug("Obtained lock on {}", lockedTask);
 
            // Deserialize the task
            AbstractBaseTask runnableTask = this.serializer.deserializeAndCast(lockedTask.getSerializedTask());
            runnableTask.setQueuedTaskId(lockedTask.getId());
 
            // Run it
            runnableTask.run();
        }
 
        logger.debug("Finishing runner {}, nothing else to do.", Thread.currentThread().getName());
    }
 
 
    /**
     * The hypervisor re-queues for execution possible stalled tasks.
     */
    @Scheduled(fixedRate=60l*60l*1000l) // Every hour
    public void hypervisor() {
 
        logger.debug("Started hypervisor {}", Thread.currentThread().getName());
 
        // Reset stalled threads, one at a time to avoid too wide transactions
        while ( tryResetStalledTask() );
 
        logger.debug("Finishing hypervisor {}, nothing else to do.", Thread.currentThread().getName());
    }
 
 
    /**
     * Tries to ensure a lock on a task in order to execute it.
     * 
     * @return A locked task, or <code>null</code> if there is no task available
     *         or no lock could be obtained.
     */
    private QueuedTaskHolder tryLockTask() {
 
        int tries = 3;
 
        QueuedTaskHolder ret = null;
        while ( tries >  ) {
            try {
                ret = obtainLockedTask();
                return ret;
            } catch (OptimisticLockingFailureException e) {
                tries--;
            }
        }
 
        return null;
    }
 
    /**
     * Tries to reset a stalled task.
     * 
     * @return <code>true</code> if one task was successfully re-queued,
     *         <code>false</code> if no task was re-queued, either because there
     *         are no stalled tasks or because there was a conflict re-queueing
     *         it.
     */
    private boolean tryResetStalledTask() {
        int tries = 3;
 
        QueuedTaskHolder qt = null;
        while ( tries >  ) {
            try {
                qt = resetStalledTask();
                return qt != null;
            } catch (OptimisticLockingFailureException e) {
                tries--;
            }
        }
 
        return false;
    }
 
    /**
     * @return A locked task ready for execution, <code>null</code> if no ready
     *         task is available.
     * @throws OptimisticLockingFailureException
     *             If getting the lock fails.
     */
    @Transactional
    public QueuedTaskHolder obtainLockedTask() {
        QueuedTaskHolder qt = this.queuedTaskDao.findNextTaskForExecution();
        logger.debug("Next possible task for execution {}", qt);
        if ( qt != null ) {
            qt.setStartedStamp(Calendar.getInstance(TimeZone.getTimeZone("etc/UTC")));
        }
        return qt;
    }
 
 
    /**
     * Tries to reset a stalled task, returns null if no stalled task was reset.
     * 
     * @return The re-queued task, <code>null</code> if no stalled task is
     *         available.
     * @throws OptimisticLockingFailureException
     *             If the stalled task is modified by another thread during
     *             re-queueing.
     */
    @Transactional
    public QueuedTaskHolder resetStalledTask() {
        QueuedTaskHolder stalledTask = this.queuedTaskDao.findRandomStalledTask();
        logger.debug("Obtained this stalledTask {}", stalledTask);
        if ( stalledTask != null ) {
            stalledTask.setStartedStamp(null);
        }
        return stalledTask;
    }
 
}

The base task and an example task

Now, to ensure the correct transactionality of the task execution, and that they get correctly de-queued upon completion, some extra work has to be done during their execution. This extra functionality will be centralized in a base abstract task class, from whom all the tasks in the system will inherit.

/**
 * Superclass for all async tasks.
 * <ul>
 *  <li>Ensures that its associated queued task is marked as completed in the same tx.</li>
 *  <li>Marks the task as serializable.</li>
 * </ul>
 * 
 * @author Carlos Vara
 */
public abstract class AbstractBaseTask implements Runnable, Serializable {
 
    final static Logger logger = LoggerFactory.getLogger(AbstractBaseTask.class);
 
 
    // Common data -------------------------------------------------------------
 
    private transient String queuedTaskId;
    private transient QueuedTaskHolder qth;
    private transient Calendar triggerStamp;
 
 
    public void setQueuedTaskId(String queuedTaskId) {
        this.queuedTaskId = queuedTaskId;
    }
 
    public String getQueuedTaskId() {
        return queuedTaskId;
    }
 
    public void setTriggerStamp(Calendar triggerStamp) {
        this.triggerStamp = triggerStamp;
    }
 
    public Calendar getTriggerStamp() {
        return triggerStamp;
    }
 
 
    // Injected components -----------------------------------------------------
 
    @Autowired(required=true)
    protected transient QueuedTaskHolderDao queuedTaskHolderDao;
 
 
    // Lifecycle methods -------------------------------------------------------
 
    /**
     * Entrance point of the task.
     * <ul>
     *  <li>Ensures that the associated task in the queue exists.</li>
     *  <li>Marks the queued task as finished upon tx commit.</li>
     *  <li>In case of tx rollback, frees the task.</li>
     * </ul>
     * 
     * @see java.lang.Runnable#run()
     */
    @Override
    final public void run() {
 
        try {
            transactionalOps();
        } catch (RuntimeException e) {
            // Free the task, so it doesn't stall
            logger.warn("Exception forced task tx rollback: {}", e);
            freeTask();
        }
 
    }
 
    @Transactional
    private void transactionalOps() {
        doInTxBeforeTask();
        doTaskInTransaction();
        doInTxAfterTask();
    }
 
    @Transactional
    private void freeTask() {
        QueuedTaskHolder task = this.queuedTaskHolderDao.findById(this.queuedTaskId);
        task.setStartedStamp(null);
    }
 
 
    /**
     * Ensures that there is an associated task and that its state is valid.
     * Shouldn't be needed, just for extra security.
     */
    private void doInTxBeforeTask() {
        this.qth = this.queuedTaskHolderDao.findById(this.queuedTaskId);
        if ( this.qth == null ) {
            throw new IllegalArgumentException("Not executing: no associated task exists: " + this.getQueuedTaskId());
        }
        if ( this.qth.getStartedStamp() == null || this.qth.getCompletedStamp() != null ) {
            throw new IllegalArgumentException("Illegal queued task status: " + this.qth);
        }
    }
 
 
    /**
     * Method to be implemented by concrete tasks where their operations are
     * performed.
     */
    public abstract void doTaskInTransaction();
 
 
    /**
     * Marks the associated task as finished.
     */
    private void doInTxAfterTask() {
        this.qth.setCompletedStamp(Calendar.getInstance());
    }
 
 
    private static final long serialVersionUID = 1L;
}

The class also holds a trigger stamp field that can be used before calling MyTaskExecutor.execute() to schedule the task for a given time and date.

A simple (and useless) example task that extends this base task:

/**
 * Logs the status of a User.
 * 
 * @author Carlos Vara
 */
@Configurable
public class ExampleTask extends AbstractBountyTask {
 
    final static Logger logger = LoggerFactory.getLogger(ExampleTask.class);
 
 
    // Injected components -----------------------------------------------------
 
    @Autowired
    private transient UserDao userDao;
 
 
    // Data --------------------------------------------------------------------
 
    private final String userId;
 
 
    public ExampleTask(String userId) {
        this.userId = userId;
    }
 
 
    /**
     * Logs the status of a user.
     */
    @Override
    public void doTaskInTransaction() {
 
        // Get the user
        User user = this.userDao.findBagById(this.userId);
        if ( user == null ) {
            logger.error("User {} doesn't exist in the system.", userId);
            return;
        }
 
        // Log the user status
        logger.info("User status: {}", user.getStatus());
    }
 
    private static final long serialVersionUID = 1L;
}

It’s important to note that I’m using Spring’s @Configurable to manage dependency injection after the tasks have been deserialized. You can solve this problem in a different way if using aspectj isn’t a possibility.

And finally, an example of how to use it

Last thing, a piece of simple code that shows how to send a task to the background to be executed as soon as possible and how to schedule a task so it will be executed the next day:

@Service
public class ExampleServiceImpl implements ExampleService {
 
    @Qualifier("BountyExecutor")
    @Autowired
    private TaskExecutor taskExecutor;
 
    @Transactional
    public void example() {
        // Task will execute ASAP
        this.taskExecutor.execute(new ExampleTask("1"));
        // Task won't execute until tomorrow
        ExampleTask et = new ExampleTask("2");
        Calendar tomorrow = Calendar.getInstance();
        tomorrow.add(Calendar.DAY, 1);
        et.setTriggerStamp(tomorrow);
        this.taskExecutor.execute(et);
    }
}

An explanation of a task lifetime

Given that the algorithm presented here is a bit complex, I will details the steps in the lifetime of a task to clarify how the system ensures reliable execution.

Step 1: task queuing

A task is enqueued when calling MyTaskExecutor.execute(). The en-queuing is part of the transaction opened in the service method that creates the task, so if that transaction fails, both your service method changes and the task data are left uncommitted, which is the correct behavior.

Step 2: task locking

Your task is stored in the DB, and it has its started and completed stamps set to null. This means that it hasn’t been executed yet, and that it seems that nobody is trying to execute it. The executor then tries to lock it, by fetching it from the db and setting its started stamp. If that transaction succeeds, it’s guaranteed that the thread is the only one with that task assigned. If the thread were to die now, in between transactions, the task would eventually become stalled and be re-queued by the hypervisor.

Step 3: task execution

Now that the thread has a lock in the task, the execution starts. A new transaction is started, and the task operations are performed inside it along with marking the task as completed at the end of the transaction. If the transaction succeeds, the task will be correctly de-queued as part of it. If it fails, a try is done to free the task immediately, but if this try also failed (or its code was never reached) the task would be eventually collected by the hypervisor.

And that’s it. Hope you find it useful, please post a comment if you successfully re-use the system :-)

Edit: 2010-07-05
I shared a template project which illustrates this system at github: http://github.com/CarlosVara/spring-async-persistent-tasks