So I just changed axonframework version from 4.3 to 4.4 and after that I am not be able to retrieve initialResult from SubscriptionQueryResult, but I can still retrieve updates result. So with this code
I am not be able to retrieve any result and (also updates).
val query = FetchUserById(userId = user.getUserId())
val queryResult = queryGateway.subscriptionQuery(query,
ResponseTypes.instanceOf(UserDetail::class.java),
ResponseTypes.instanceOf(UserDetail::class.java)
)
Flux.using(
{ queryResult },
{
it.initialResult()
.concatWith(it.updates())
.flatMap { data ->
Flux.interval(Duration.ofSeconds(30))
.map { sseEmitter(event = "heartbeat", data = "keep-alive") }
.startWith(sseEmitter(event = "user-info", data = Gson().toJson(data.toUserInformationDTO())))
}
},
SubscriptionQueryResult<UserDetail, UserDetail>::close
)
Here’s error log from application.
2021-03-19 12:01:11.769 WARN 1 --- [ueryProcessor-1] o.a.q.LoggingQueryInvocationErrorHandler : An error occurred while processing query message [io.unknown.easyfood.coreapi.FetchUserById]
java.util.concurrent.TimeoutException: null
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1756) ~[na:1.8.0_212]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) ~[na:1.8.0_212]
at org.axonframework.queryhandling.SimpleQueryBus.lambda$scatterGather$3(SimpleQueryBus.java:206) ~[axon-messaging-4.4.7.jar:4.4.7]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_212]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[na:1.8.0_212]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[na:1.8.0_212]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[na:1.8.0_212]
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[na:1.8.0_212]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:1.8.0_212]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_212]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[na:1.8.0_212]
at org.axonframework.axonserver.connector.query.AxonServerQueryBus$QueryProcessingTask.run(AxonServerQueryBus.java:437) ~[axon-server-connector-4.4.7.jar:4.4.7]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_212]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]
but if I remove initialResult like this even though there’s no data at first I am still be able to retrieve the incoming update data.
Flux.using(
{ queryResult },
{
it.updates()
.flatMap { data ->
Flux.interval(Duration.ofSeconds(30))
.map { sseEmitter(event = "heartbeat", data = "keep-alive") }
.startWith(sseEmitter(event = "user-info", data = Gson().toJson(data.toUserInformationDTO())))
}
},
SubscriptionQueryResult<UserDetail, UserDetail>::close
)
I have tried changing version back to 4.3 and it worked again, but still not working with 4.4
here are some of my dependencies
implementation("org.axonframework:axon-spring-boot-starter:4.4.7")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("com.novemberain:quartz-mongodb:2.2.0-rc2")