How and where handle data(fileupload)

Hello,
I’d like to know how are you processing data files.
For example imagine that we have here some file - lets name it template (template has 50mb).
So how to process this?
Is it ok to

  1. create command UploadTemplate which will have field byte[] template
  2. in commandhandler save template to filestore
  3. create event TemplateUploaded which will contain only reference to that file e.g. String templatePath and send it to eventbus

Or is there better approach?

Thanks in advance

Hi Lukas,

that sounds like a perfectly fine approach. If your system needs to be scalable, make sure that “templatePath” is an abstract path. With that, I mean it should rather be an identifier than, for example, an absolute path to a file.

Cheers,

Allard

Hi Allard,
can you give me more informations (or better some example) about what you mean by abstract path?

Many thanks

Dňa štvrtok, 6. augusta 2015 21:21:14 UTC+2 Allard Buijze napísal(-a):

I guess Allard meant some sort of distributed storage like NFS path, or even S3 service url.
Regarding your domain model, do you have a separate aggregate which is responsible for file saving?
From where the TemplateUploaded event is being produced?

Cheers

For this I will have only shared FS. I don’t have now aggregate for file saving.
I was thinking about aggregate Template which will have command UploadTemplateFile and its event TemplateFileUploaded.
But I still doesn’t get how to create this path as abstract.
As you mentioned another aggregate - should I create aggregate for this, which will be responsible only for filesaving (template, logo, configuration, etc…) and this will hold this informations with its IDs?

Thanks

Dňa štvrtok, 6. augusta 2015 23:51:54 UTC+2 Nedim Sabic napísal(-a):

Yes, I think that’s the way to go.

Cheers

And what about managing this?
Should I manage this through Saga/process manager or what is the best practice?

Hi Lukáš,

We handle a very similar use case. Our design is as follows:

  • HTTP POST is submitted to /{dataType}/uploadData with file in body (multipart/form-encoded)
  • The HTTP Controller takes the byte[] data and stores it in an ‘uploadRepository’. The repository returns a unique ID (“U1”) for this file
  • The Controller then takes the remaining posted fields and submits a command ("SubmitUploadCommand(uploadID = U1, fileName = “data.xlsx”, …)
  • The Aggregate is created by the SubmitUploadCommand and fires a UploadSubmittedEvent
  • The command handler also has access to a ‘spreadsheet validator / parser’ service:
    Set errors = spreadsheetValidator.validateUpload(“U1”)
  • If the validator/parser service returns any errors, those are recorded as additional events, and the overall state of the upload will go into a ‘failed’ state (UploadFailedEvent)
  • The aggregate never handles the byte[] data directly, instead it passes the uploadId to a service that can retrieve the data from the repository.

We’ve found that some of the data processing can take multiple seconds, and we created a saga for this workflow:
Upload Aggregate: UploadSubmittedEvent
Upload Processor Saga: create saga, schedule validation task
Upload Processor Saga: scheduled task runs, validate file (asynchronously from user command)
Upload Processor Saga: Send ‘success’ or ‘failed’ command to Upload Aggregate
Upload Processor Saga: saga ends

We built a whole workflow around this: The upload aggregate state model goes from “submit” through “validate”, “process” to “success” (or “failed”). The aggregate only tracks the file ID (“U1”) and has the Saga or another infrastructure service handle the actual file data.

I would avoid placing large data on the command itself. In our case, we serialize the commands (logging/auditing) and we route them via JGroups (clustered command bus). Having large amounts of binary data here would break the infrastructure.

We ended up storing the data as a BLOB in the database, so it’s accessible by any VM, but shared file systems, S3 or any other storage solution would work equally well.

Taking the actual file processing out of the aggregate and into a Saga works great to give users feedback on their potentially long-running processes. Just account for it in your state model.(Upload Submitted -> Upload Validated -> Processing Started -> Upload Processed -> Deactivate Requested -> Upload deactivated.)

If you do use Sagas for asynchronous processing, I would recommend keeping them as short-lived as possible (end the saga after every step, create a new saga if another step is needed.)

We have separate aggregates for the “Upload” and the actual objects that are being created from the uploaded data.

~Patrick

Hi Patrick,
At first thank you, this is what I needed to see :slight_smile:

I have additional questions:

What is uploadRepository in your case? Is it some infrastructure service or some spring bean which is saving data as blob to DB or is it some part of axon framework?

We have separate aggregates for the “Upload” and the actual objects that are being created from the uploaded data.

If you do use Sagas for asynchronous processing, I would recommend keeping them as short-lived as possible (end the saga after every step, create a new saga if another step is needed.)

Is it possible to share some part of code somewhere on github for this? For me this is one of most interesting parts of “how to do in cqrs” (and also axon) and it would be nice to have some reference to that.

Thanks

Our upload repository is a spring singleton and simply executes jdbc queries.

Sorry, I’m not working on an open source project, so I can’t publish any source code.