Using a MessageStream after invoking asStream in BlockingStream

In the BlockingStream class the doc on method asStream says the following:

Note that iterating over the returned Stream may affect this MessageStream and vice versa. It is therefore not recommended to use this MessageStream after invoking this method.

Sounds like there could be problem when iterating over this MessageStream. What would be a safe way to be able to iterate over this MessageStream?

The usecase for above is that we want to find all domain event messages of a certain payload type with a certain (identifier) value in it.

Hi @rafiek, welcome back.

To be frank with you, I had to dig in the implementation to figure out why this piece of documentation is there.
If we check the code, you can spot that the MessageStream#asStream uses the StreamUtils#asStream operation.

At first glance, I cannot see how invoking asStream would impact the contents of the stream when looking at this StreamUtils, to be frank…:

public static <M> Stream<M> asStream(BlockingStream<M> messageStream) {
    Spliterator<M> spliterator =
            new Spliterators.AbstractSpliterator<M>(Long.MAX_VALUE, DISTINCT | NONNULL | ORDERED) {
                @Override
                public boolean tryAdvance(Consumer<? super M> action) {
                    try {
                        action.accept(messageStream.nextAvailable());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return false;
                    }
                    return true;
                }
            };
    return stream(spliterator, false).onClose(messageStream::close);
}

Granted, the code you’re referring to is built in 2016, by somebody that’s no longer on the project. Nor do the commit messages give me any hint “why” this is drafted.

What I can say is that Axon Framework internally does not use the asStream operation at all. Instead, it uses the hasNextAvailable, peek, and nextAvailable operations on the BlockingStream.

However, once you have retrieved a BlockingStream instance and invoked the asStream operation on it, it just recommend you do not use the other operations anymore.

So looking at your use case:

If this is done by constructing a unique BlockingStream, you should be good. I wouldn’t see why not at least :slight_smile:

1 Like