To any user who stumbles on to this thread in the future (and the DistributedQueryBus is not available yet) I implemented a temporal solution using AMQP
First I created a TemporalQueryGateway class with to send the query using a RabbitTemplate. It emulates two of the methods of the projects DefaultQueryGateway (axon 3.2) to make the migration on the future a bit less painful:
import org.axonframework.queryhandling.responsetypes.ResponseType;
import org.axonframework.queryhandling.responsetypes.ResponseTypes;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Component
public class TemporalQueryGateway {
private final RabbitTemplate rabbitTemplate;
@Autowired
public TemporalQueryGateway(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public <Q, R> CompletableFuture<R> query(Q query, ResponseType<R> responseType) {
log.debug("Sending query message [{}]", query.getClass().getSimpleName());
return CompletableFuture.supplyAsync(() -> responseType.convert(
rabbitTemplate.convertSendAndReceive(QUERY_EXCHANGE_NAME,
QUERY_ROUTING_KEY,
query)
));
}
public <R, Q> CompletableFuture<R> query(Q query, Class<R> responseType) {
return query(query, ResponseTypes.instanceOf(responseType));
}
}
Next the configuration:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Configuration
@ConditionalOnProperty("axon.amqp.exchange")
public class QueryRabbitConfig {
public static final String QUERY_QUEUE_NAME = "query.queue";
public static final String QUERY_EXCHANGE_NAME = "query.exchange";
public static final String QUERY_ROUTING_KEY = "query.key";
@Bean
Exchange queryExchange() {
return ExchangeBuilder.directExchange(QUERY_EXCHANGE_NAME).durable(true).build();
}
@Bean
Queue queryQueue() {
return QueueBuilder.durable(QUERY_QUEUE_NAME).build();
}
@Bean
Binding queryBinding() {
return BindingBuilder.bind(queryQueue()).to(queryExchange()).with(QUERY_ROUTING_KEY).noargs();
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
TemporalQueryGateway temporalQueryGateway(RabbitTemplate template) {
return new TemporalQueryGateway(template);
}
}
And finally the listener:
@Slf4j
@Component
@RabbitListener(queues = QueryRabbitConfig.QUERY_QUEUE_NAME)
public class UserQueryListener {
private final UserRepository repo;
@Autowired
public UserQueryListener(UserRepository repo) {
this.repo = repo;
}
@QueryHandler
@RabbitHandler
public Boolean handle(QueryUserExists query) {
log.trace("Handling query [{}]", query.getClass().getSimpleName());
return repo.exists(query.getIdUser());
}
@QueryHandler
@RabbitHandler
public SimpleUser[] handle(QueryUsersByIds query) {
log.trace("Handling query [{}]", query.getClass().getSimpleName());
Iterable<User> found = repo.findAll(QUser.user.id.in(query.getIdUsers()));
return StreamSupport.stream(found.spliterator(), false)
.map(user -> new SimpleUser(
user.getId(),
user.getNames(),
user.getSurnames(),
user.getStatus(),
user.getLastUpdated(),
user.getEmail(),
user.getImageUrl()
))
.toArray(SimpleUser[]::new);
}
}
To return multiple results use Arrays as they don’t lose the type by erasure and can be deserialized to the specific object type by the message handler.
You can then query as you would using the query gateway:
queryGateway.query(new QueryUsersByIds(ids), ResponseTypes.multipleInstancesOf(SimpleUser.class))