See attached screenshots.
Transactions appear to last for 6 hrs etc.
Depth of trace (nb of spans) is 1400.
In last screenshot you can see that 2 subsequent commands that are unrelated, are in the same trace.
Axon fw 4.6.2.
AxonServer SE 4.6.7
See attached screenshots.
Transactions appear to last for 6 hrs etc.
Depth of trace (nb of spans) is 1400.
In last screenshot you can see that 2 subsequent commands that are unrelated, are in the same trace.
Axon fw 4.6.2.
AxonServer SE 4.6.7
Hello @Christian_Bonami,
Thanks for reporting this. In order to investigate further, I’d like some more information.
The commands that are joined together, have they no relation at all?
Or, is one command triggering an event, which triggers another command or deadline?
Perhaps we should adjust some correlation, but I need to know which
Thanks in advance,
Mitchell Herrijgers
No, these commands originate from the same @Controller in Spring, and are coming from subsequent websocket/rsocket messages that come from ‘outside’ (the browser).
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
The fact that we use Spring WebFlux with WebSocket/RSocket might be the cause.
A socket is opened and then multiple messages (JSON string) are sent over the socket, and each message is a different command (a user that has clicked on a button, or an external system that delivered a message that we transform into a command).
I suspect that, since all messages/commands are arriving via the same socket connection, they all receive the same traceid.
If that’s the case, there should be a means to ‘reset’ the traceid at every command. Advice?
We are using the OpenTelemetry agent (auto-instrumentation), and although it has support for Spring in general, I don’t think it has good support for WebFlux and Websockets.
Originally, I tried Spring Sleuth for opentelemetry, but this library is not compatible with axon-tracing-opentelemetry; I presume that this is due to the fact that Sleuth is using an older version of the opentelemetry API.
Maybe a tip: could you provide a version of axon tracing hotel that can be used with Sleuth. Sleuth has better support for WebFlux etc. Just my 5 cts.
Some code:
@Controller
public class GatewayController {
static final String QUERY_DESTINATION = "socket.query";
static final String SUBSCRIPTION_QUERY_DESTINATION = "socket.subscription.query";
static final String COMMAND_DESTINATION = "socket.command";
private final ReactorQueryGateway queryGateway;
private final ReactorCommandGateway commandGateway;
public GatewayController(ReactorQueryGateway queryGateway, ReactorCommandGateway commandGateway) {
this.queryGateway = queryGateway;
this.commandGateway = commandGateway;
}
@MessageMapping(COMMAND_DESTINATION)
public Mono<SocketResponse<?>> command(PublicCommand command) {
if (log.isInfoEnabled()) log.info(">> Received command {}", command.getClass().getSimpleName());
return commandGateway.send(command)
.map(SocketResponse::ok);
}
@MessageMapping(QUERY_DESTINATION)
public Mono<SocketResponse<?>> query(Query query) {
if (log.isInfoEnabled()) log.info("Received query {}", query.getClass().getSimpleName());
return queryGateway.query(
query,
Object.class
)
.map(SocketResponse::ok);
}
@MessageMapping(SUBSCRIPTION_QUERY_DESTINATION)
public Flux<SocketResponse<?>> subscriptionQuery(Query query) {
if (log.isInfoEnabled()) log.info("Received subscriptionQuery {}", query.getClass().getSimpleName());
return queryGateway.subscriptionQuery(
query,
Object.class
)
.map(SocketResponse::ok);
}
@MessageExceptionHandler
public Mono<SocketResponse<ErrorResponse>> handleException(Exception exception) {
log.error("Error", exception);
return Mono.just(ErrorResponse.builder()
.errors(ErrorDetail.builder()
.code("VAL_00001")
.detail(exception.getMessage())
.build()
)
.build()
)
.map(SocketResponse::badRequest);
}
}
@ToString
@EqualsAndHashCode
public class SocketResponse<T> {
private int status;
private T body;
private SocketResponse() {
}
public static <T> SocketResponse<T> ok(T body) {
SocketResponse<T> socketResponse = new SocketResponse<>();
socketResponse.status = HttpStatus.OK.value();
socketResponse.body = body;
return socketResponse;
}
public static SocketResponse<ErrorResponse> badRequest(ErrorResponse body) {
SocketResponse<ErrorResponse> socketResponse = new SocketResponse<>();
socketResponse.status = HttpStatus.BAD_REQUEST.value();
socketResponse.body = body;
return socketResponse;
}
public int getStatus() {
return status;
}
public T getBody() {
return body;
}
}
@ToString
@EqualsAndHashCode
public class SimpleTestCommand implements PublicCommand {
private String value;
private SimpleTestCommand() {
}
private SimpleTestCommand(String value) {
this.value = value;
}
public static SimpleTestCommand create(String value) {
return new SimpleTestCommand(value);
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
...
---
# RSocket
spring:
rsocket.server:
port: 7000
transport: websocket
mapping-path: /socket
Hello @Christian_Bonami,
Thanks for your reply, I think your thesis is correct. The trace id might be stored in the reactor context that is being reused by RSocket for message handling. The commands that are being sent are part of the parent trace on purpose; it’s beneficial to see a command being dispatched from an endpoint, for example.
I’m not sure how to improve this situation at the moment, as the creation of this parent trace is outside of the control of Axon Framework. We could provide and option to create a new root trace for each command, instead of reusing the parent.
Another thing that might help, is an RSocket interceptor that creates a new trace for every call . Or, annotating message handlers with OpenTelemetry annotations.
Could you try to see if one of these solutions work for you?
Yes. This improves the situation:
@Slf4j
@Configuration
public class ExtraRSocketConfig implements RSocketServerCustomizer {
private final MeterRegistry registry;
public ExtraRSocketConfig(MeterRegistry registry) {
this.registry = registry;
}
@Override
public void customize(io.rsocket.core.RSocketServer rSocketServer) {
rSocketServer.interceptors(
iRegistry -> {
log.info("Adding RSocket interceptors...");
ImmutableTag tag = new ImmutableTag("application", "ce-calendar-timespend-frontend");
iRegistry.forResponder(new MicrometerRSocketInterceptor(registry, tag));
iRegistry.forRequester(rSocketInterceptors -> {
rSocketInterceptors.add(new MicrometerRSocketInterceptor(registry, tag));
rSocketInterceptors.add(new OTelTraceResettingRSocketInterceptor());
});
iRegistry.forConnection(new MicrometerDuplexConnectionInterceptor(registry, tag));
}
);
}
static final class OTelTraceResettingRSocketInterceptor implements RSocketInterceptor {
@Override
public RSocket apply(RSocket rSocket) {
return new OTelRSocket(rSocket);
}
}
}
and
class OTelRSocket implements RSocket {
private final RSocket delegate;
public OTelRSocket(RSocket delegate) {
this.delegate = delegate;
}
@Override
public void dispose() {
delegate.dispose();
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
Span span = tracer.spanBuilder("OTelRSocket fireAndForget").setNoParent().startSpan();
return delegate
.fireAndForget(payload)
.doOnError(span::recordException)
.doFinally(signalType -> span.end());
}
@Override
public Mono<Void> metadataPush(Payload payload) {
Span span = tracer.spanBuilder("OTelRSocket metadataPush").setNoParent().startSpan();
return delegate
.metadataPush(payload)
.doOnError(span::recordException)
.doFinally(signalType -> span.end());
}
@Override
public Mono<Void> onClose() {
return delegate.onClose();
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Span span = tracer.spanBuilder("OTelRSocket requestChannel").setNoParent().startSpan();
return delegate
.requestChannel(payloads)
.doOnError(span::recordException)
.doFinally(signalType -> span.end());
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.defer(
() -> {
Span span = tracer.spanBuilder("OTelRSocket requestResponse").setNoParent().startSpan();
return delegate
.requestResponse(payload)
.doOnError(span::recordException)
.doFinally(signalType -> span.end());
});
}
@Override
public Flux<Payload> requestStream(Payload payload) {
Span span = tracer.spanBuilder("OTelRSocket requestStream").setNoParent().startSpan();
return delegate
.requestStream(payload)
.doOnError(span::recordException)
.doFinally(signalType -> span.end());
}
private final static Tracer tracer = GlobalOpenTelemetry.getTracer("be.acerta.ce.cqrs.common.gateway.rsocket.OTelRSocket");
}