Asynchronous tasks in OmegaML
Greetings!
I have a question concerning asynchronous tasks in omega.
Currently, we have an endpoint that make fit-generate, and returns a message with status 200 to the User when synth dataset was generated.
Are there any options to make fit and generate asynchronously?
The flow should be the next:
- User makes a request on the fit-generate endpoint
- we make a call on the om.runtime().fit()
- User gets a status 200 and message like "Your request is processing"
At the same time on the backend we:
- fitting the model
- just after it was fitted om.runtime().predict() should be called (to generate a synth dataset)
- After fitting was done, User can see a synthetic dataset created with the status SUCCESSES
The question is:
How can I provide such a functionality when both fit() and predict() are performing asynchronously?
... and predict() will be called automatically just after the fit() was ended?
Comments
Are there any options to make fit and generate asynchronously?
All omega|ml tasks submitted via the runtime are asynchronous. For example
The task status is available in the AsyncResult.status, see the Celery documentation for details. Calling
result.get()
will wait on the result.The omega|ml REST API by default is synchronous, however can be asked to run asynchronously by adding the
async=1
query parameter:In this case the current task status result can be queried at the URI given by Location:
we have an endpoint that make fit-generate, and returns a message with status 200 to the User when synth dataset was generated.
If your application makes use of the omega|ml REST API, you can use the above. In case your application provides its own endpoint, it can remember the task id (e.g. store it in the session), then get back the AsyncResult object using
This is the equivalent of the AsyncResult instance as returned by submitting the task in the first place, as shown above. This should be implemented in the application's endpoint logic, e.g. as part of restoring the user's session.
just after it was fitted om.runtime().predict() should be called (to generate a synth dataset)
This can be done by using task chaining. Since omega|ml relies on Celery, this is actually straightforward to do, however it requires a few specific steps to enable omega|ml's own preparation and handling of tasks. I will show the principle first, then point to an example that is more convenient to use, using a runtime plugin.
The principle
How to make it work
The above shows the principle, however it is too cumbersome in practice. Since release 0.13.7 we provide the following task chaining plugin:
The following chaining types are supported: sequence, parallel, mapreduce. See
help(om.runtime.sequence)
for more examples.Greetings!
While trying to use:
I get this error:
'OmegaRuntime' object has no attribute 'chain'
The error indicates that the plugin has not been installed. Please install it as per the plugin's documentation:
PS: The next release (planned for mid January) will include the plugin by default.
Greetings!
There are some additional questions concerning the fit-generate as an async task.
The method you mention above is working perfectly - thank you for your advice!
I want additionally to track and save the state of fitting/generating. What is the best approach to track the result of the tasks - fit and generate separately? So, the user can see if the model was fitted or the dataset was generated.
Also, I want to add some additional fields (e.g. model_fitted_times) to the model/dataset metadata just after the tasks are completed. Is there any option to get a callback from the task that it is done, and I can call another method?
Thank you in advance!
Hi!
I have already solved the first issue - track the result of the task - by getting the id of fit, as a parent task, and generate as a child task.
But still, there is a need to get callbacks from Celery that the task was done successfully. Seems like using Celery signals is the best approach for this, but it does not work properly.
I was trying the following:
and I got the callback from this signal, like: 'omegaml.tasks.omega_fit'
But when I was trying to call task_success signal - I got nothing:
At the same time, using:
celery -A omegaml.celeryapp events
I see that both tasks were done successfully.
Are there any suggestions of how to get the callback just after the task was successfully done?
track the result of the task - by
get callbacks from Celery
task_success signal
This is best done using Celery events, not signals:
Since the 0.13.7 release the omega|ml streams (minibatch) feature allows to trace and process celery tasks easily:
The tracker function can contain arbitrary code. If multiple windows are ready to process, the function will be called in parallel (or as configured in by the streaming parameters -- see omegaml minibatch for details).
This code can be embedded in a Jupyter notebook, e.g. mytasks.ipynb, and run like this:
With this, every task's information run by omegaml worker will be sent to the
tracker
streaming function, contained inwindow.data
. It is possible to process each event separately or by batching by time or window size (e.g. every 5 seconds, every 10 messages etc.)Greetings!
Thank you for the response!
We are currently using the Omegaml 0.13.5.
Are there any options to handle events and get info from omegaml tasks performed using this version?
Best regards,
Daria
Hello!
I also want to ask - How can I get a callback for task-succeeded using the approach mentioned by you above?
I mean - only for this event.
Thank you in advance!
Best Regards,
Daria
The list of events to filter on can be specified as a tuple:
The default events are listed in
CeleryEventSource.default_events
:Hi!
I have done some steps and got an error. Namely:
While trying to run the server I see as it is waiting and can not start properly. After a while I get such an error:
AttributeError: 'State' object has no attribute '__reduce_args__'
The whole traceback may be observed in the file:
Should I do some additional settings?
I also find HERE that:
Future releases will include a streaming worker built into the omega|ml native runtime. The syntax will be something like this:
I suppose it may simplify the streaming setup. When are you going to make the next release?
Best regards, Daria