Creating an Interval Scheduler with Quartz and Sagas

Tech Stack:

AxonVersion 4.1.1
No Axon Server
Spring Boot
Using TrackingEventProcessors

I have an aggregate that represents a delivery, with a date. On certain arbitrarily defined days since the delivery, a communication task may need to be performed. I will not know ahead of time which day since delivery, nor which task, so I will have to check a configuration once per day to let me know if there are any tasks for today.

I want to use the DeadlineManager to create an interval scheduler where, once per day, a Saga related to this delivery:

  1. wakes up
  2. calculates how many days ago the delivery was
  3. checks to see if there are tasks for the today (for example, if the delivery was 5 days ago, are there any Day 5 tasks?)
    3.a. performs the task(s) if there are any to be performed
  4. schedules another deadline for tomorrow, to start the process all over again

I do not want all the deadlines to be at the same time of day (ie all tasks run at 3pm), since that would cause potentially thousands of Saga’s to wake up at once. Instead, I want each Saga to pick a random time tomorrow to schedule its next task deadline at. These deadlines will continue to be scheduled, ran, and rescheduled until a global timeout is reached (normally 60 days after delivery).

I was attempting to do this without applying events because:

  1. I need to operate off of realtime right now, not based off the date of any event
  2. the ability to replay events would be counterproductive, since the communication tasks need to be sent out in realtime
  3. Most days there will not be any task to perform, so it seems like applying the event would just fill the Event Store with noise.

I was also trying to do this without scheduling all 60 deadlines ahead of time, because it seemed like a lot to schedule all at once.

Is the above possible in Axon? I have noticed that the SagaTestFixture only allows for the assertion of scheduled deadlines at specific Instants, but I need to assert that the scheduled deadline is between an upper and lower bound (ie, just some time tomorrow).

Moreover, I have deployed my implementation of the above logic to our Dev environment, and have noticed that Quartz regularly will acquire a scheduled trigger, but fail to invoke the corresponding DeadlineHandler in the Saga, which effectively breaks the chain of daily tasks since the DeadlineHandler is responsible for scheduling the next deadline. I am using the DeadlineManager elsewhere in the application without issue, which makes me think that there is something in the above implementation that framework cannot guarantee can be performed correctly.

Thank you,

Based on your explanation I’m not convinced that u need a Saga at all. Even better if you don’t.

Look like you could make a deadline per aggregate or aggregate member, if you provide more input about your domain I could give few suggestions there…

Anyway, had a similar problem and solved it with a custom implementation of DeadlineManager that supports Cron jobs.

Implementation:

    /**
     * Implementation of DeadlineManager that supports cron job expressions
     *
     * @author Stefan Dragisic
     */
    public class CronJobDeadlineManager extends QuartzDeadlineManager {

        protected CronJobDeadlineManager(Builder builder, Scheduler scheduler, Serializer serializer) {
            super(builder);

            this.scheduler = scheduler;
            this.serializer = serializer;
        }

        private static final String JOB_NAME_PREFIX = "deadline-";
        private final Scheduler scheduler;
        private final Serializer serializer;

        public String scheduleCronJob(Instant triggerDateTime,
                               String deadlineName,
                               Object messageOrPayload,
                               ScopeDescriptor deadlineScope,
                               String cronExpression,
                               String triggerName,
                               String triggerGroup) {
            DeadlineMessage<Object> deadlineMessage = asDeadlineMessage(deadlineName, messageOrPayload, triggerDateTime);
            String deadlineId = JOB_NAME_PREFIX + deadlineMessage.getIdentifier();

            runOnPrepareCommitOrNow(() -> {
                DeadlineMessage interceptedDeadlineMessage = processDispatchInterceptors(deadlineMessage);
                try {
                    JobDetail jobDetail = makeJobDetail(interceptedDeadlineMessage,
                            deadlineScope,
                            new JobKey(deadlineId, deadlineName));
                    scheduler.scheduleJob(jobDetail, makeCronTrigger(triggerDateTime, cronExpression, jobDetail, triggerName, triggerGroup));
                } catch (SchedulerException e) {
                    throw new DeadlineException("An error occurred while setting a timer for a deadline", e);
                }
            });

            return deadlineId;
        }

        private JobDetail makeJobDetail(DeadlineMessage deadlineMessage, ScopeDescriptor deadlineScope, JobKey jobKey) {
            JobDataMap jobData = DeadlineJob.DeadlineJobDataBinder.toJobData(serializer, deadlineMessage, deadlineScope);
            return JobBuilder.newJob(DeadlineJob.class)
                    .withDescription(deadlineMessage.getPayloadType().getName())
                    .withIdentity(jobKey)
                    .usingJobData(jobData)
                    .storeDurably()
                    .build();
        }

        private static Trigger makeCronTrigger(Instant triggerDateTime,
                                               String cronExpression,
                                               JobDetail jobDetail,
                                               String triggerName,
                                               String triggerGroup) {
            return TriggerBuilder.newTrigger()
                    .withIdentity(triggerName, triggerGroup)
                    .startAt(Date.from(triggerDateTime))
                    .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
                    .forJob(jobDetail)
                    .build();

        }

    }

Config:

        @Bean public CronJobDeadlineManager quartzDeadlineManager(org.axonframework.config.Configuration config,
                                                             Scheduler scheduler,
                                                             Serializer serializer,
                                                             TransactionManager transactionManager) {
            QuartzDeadlineManager.Builder defaultDeadlineManager = QuartzDeadlineManager.builder()
                    .scheduler(scheduler)
                    .transactionManager(transactionManager)
                    .scopeAwareProvider(new ConfigurationScopeAwareProvider(config));

            return new CronJobDeadlineManager(defaultDeadlineManager, scheduler, serializer);
        }

Usage:

    @CommandHandler public void handle(SomeCommand command, CronJobDeadlineManager deadlineManager) {
        
        DeadlinePayload deadlinePayload = new DeadlinePayload(/*if u need payload*/);

        String scheduleId = deadlineManager
                .scheduleCronJob(
                        Instant.now(), //<- start at, usualy right away
                        YOUR_DEADLINE_NAME, deadlinePayload,
                        Scope.describeCurrentScope(),
                        YOUR_CRON_JOB_EXPRESSION, //here you want to write Quartz Cron Job expression
                        YOUR_DEADLINE_TRIGGER_NAME,
                        YOUR_DEADLINE_GROUP_NAME);
        
        apply(new DeadlineCreatedEvent(command.getAggregaId, scheduleId)); //<-save scheduleId in Aggregate so u can cancel deadline later
    }



    
    @DeadlineHandler(deadlineName=YOUR_DEADLINE_NAME) void doOnDeadline(DeadlinePayload payload, CommandGateway commandGateway) {
       
            //put this in aggreage and do you logic here when deadline is triggered
             if (someState == true) {
                 commandGateway.send(new TriggerDesiredActionCommand(...));
             }
             
    }

when u decide to cancel:

        @CommandHandler public void handle(CancelDeadlineCommand command, DeadlineManager     deadlineManager) {
                deadlineManager.cancelSchedule(YOUR_DEADLINE_NAME, scheduleId);

                apply(new DeadlineCancelledEvent(command.getAggregateId(),scheduleId));
            }

With this code, whole scheduling logic it is a matter of writing a single Quartz Cron expression, that will tell Quartz, do this once per day, at this time, for the next 60 days (to cancel it automatically or you can cancel it manually as in the example above). For this check https://www.freeformatter.com/cron-expression-generator-quartz.html

Approaching the problem without a Saga also sounds like a good option. I had not considered that approach, because it did not seem like the “CQRS/DDD” way of solving the problem, but maybe my interpretation of CQRS/DDD is too narrow.

Proceeding without the Saga also keeps the number of entries in the Saga entry table down, which is good.

I will go an implement this and let you know how it went.

Thank you,

David