Asynchronous tasks in OmegaML

Greetings!

I have a question concerning asynchronous tasks in omega.

Currently, we have an endpoint that make fit-generate, and returns a message with status 200 to the User when synth dataset was generated.

Are there any options to make fit and generate asynchronously?

The flow should be the next:

  • User makes a request on the fit-generate endpoint
  • we make a call on the om.runtime().fit()
  • User gets a status 200 and message like "Your request is processing"

At the same time on the backend we:

  • fitting the model
  • just after it was fitted om.runtime().predict() should be called (to generate a synth dataset)
  • After fitting was done, User can see a synthetic dataset created with the status SUCCESSES


The question is:

How can I provide such a functionality when both fit() and predict() are performing asynchronously?

... and predict() will be called automatically just after the fit() was ended?

Comments

  • edited February 2021

    Are there any options to make fit and generate asynchronously?

    All omega|ml tasks submitted via the runtime are asynchronous. For example

    om.runtime.model('mymodel').fit('sample[x]', 'sample[y]')
    => <AsyncResult: b552e887-9ced-4e87-a203-0a4183e4f461> #type celery.result.AsyncResult
    

    The task status is available in the AsyncResult.status, see the Celery documentation for details. Calling result.get() will wait on the result.

    The omega|ml REST API by default is synchronous, however can be asked to run asynchronously by adding the async=1 query parameter:

    GET /api/v1/model/mymodel/?datax=sample[x]&datay=sample[y]&async=1
    => 
    HTTP 202/ACCEPTED
    Location: /api/v1/task/b552e887-9ced-4e87-a203-0a4183e4f461/result
    

    In this case the current task status result can be queried at the URI given by Location:

    GET /api/v1/task/b552e887-9ced-4e87-a203-0a4183e4f461/result?resource_uri=/api/v1/model/mymodel/
    =>
    {"response": {"model": "mymodel", "result": ...}, 
     "status": "SUCCESS", 
     "task_id": "b552e887-9ced-4e87-a203-0a4183e4f461"
    }
    

    we have an endpoint that make fit-generate, and returns a message with status 200 to the User when synth dataset was generated.

    If your application makes use of the omega|ml REST API, you can use the above. In case your application provides its own endpoint, it can remember the task id (e.g. store it in the session), then get back the AsyncResult object using

    om.runtime.celeryapp.AsyncResult(taskid)
    

    This is the equivalent of the AsyncResult instance as returned by submitting the task in the first place, as shown above. This should be implemented in the application's endpoint logic, e.g. as part of restoring the user's session.

    just after it was fitted om.runtime().predict() should be called (to generate a synth dataset)

    This can be done by using task chaining. Since omega|ml relies on Celery, this is actually straightforward to do, however it requires a few specific steps to enable omega|ml's own preparation and handling of tasks. I will show the principle first, then point to an example that is more convenient to use, using a runtime plugin.

    The principle

    # get a task object
    fit_task = om.runtime.task('omegaml.tasks.omega_fit')
    predict_task = om.runtime.task('omegaml.tasks.omega_predict')
    
    # create task signatures, with omega|ml preparation applied
    fit_sig = fit_task.task.signature(...)
    predict_sig = predict_task.task.signature(...)
    
    # chain and submit tasks for processing
    from celery import chain
    result = chain(fit_sig, predict_sig).apply_async() 
    

    How to make it work

    The above shows the principle, however it is too cumbersome in practice. Since release 0.13.7 we provide the following task chaining plugin:

    # this will chain the fit and predict, i.e. fit will run only on predict success
    with om.runtime.sequence() as crt:
      crt.model('regmodelx').fit('sample[y]', 'sample[x]')
      crt.model('regmodelx').predict([5], rName='foox')
      result = crt.run() # type AsyncResult
    	
    # some time later
    print(result.get())
    

    The following chaining types are supported: sequence, parallel, mapreduce. See help(om.runtime.sequence) for more examples.

  • Greetings!


    While trying to use:

    with om.runtime.chain() as crt:
        ...
    

    I get this error:

    'OmegaRuntime' object has no attribute 'chain'

  • The error indicates that the plugin has not been installed. Please install it as per the plugin's documentation:

    !pip install -q getgist
    !rm -f *omx_chained.py && getgist -y omegaml omx_chained.py
    import omx_chained
    

    PS: The next release (planned for mid January) will include the plugin by default.

  • edited January 2021

    Greetings!


    There are some additional questions concerning the fit-generate as an async task.

    The method you mention above is working perfectly - thank you for your advice!


    I want additionally to track and save the state of fitting/generating. What is the best approach to track the result of the tasks - fit and generate separately? So, the user can see if the model was fitted or the dataset was generated.

    Also, I want to add some additional fields (e.g. model_fitted_times) to the model/dataset metadata just after the tasks are completed. Is there any option to get a callback from the task that it is done, and I can call another method?


    Thank you in advance!

  • edited January 2021

    Hi!


    I have already solved the first issue - track the result of the task - by getting the id of fit, as a parent task, and generate as a child task.


    But still, there is a need to get callbacks from Celery that the task was done successfully. Seems like using Celery signals is the best approach for this, but it does not work properly.


    I was trying the following:

    @signals.after_task_publish.connect(sender='omegaml.tasks.omega_fit')
    def omega_fit_task_sent_handler(sender=None, task_id=None, task=None, args=None, **kwargs):
        print(sender)
    

    and I got the callback from this signal, like: 'omegaml.tasks.omega_fit'


    But when I was trying to call task_success signal - I got nothing:

    @signals.task_success.connect(sender=omega_fit)
    def task_success_handler(sender=None, result=None, **kwargs):
    

    At the same time, using:

    celery -A omegaml.celeryapp events

    I see that both tasks were done successfully.


    Are there any suggestions of how to get the callback just after the task was successfully done?

  • edited February 2021

    track the result of the task - by

    get callbacks from Celery

    task_success signal

    This is best done using Celery events, not signals:

    • Signals are an in-process callback feature and are really meant to be used by the worker itself. In omega|ml there is no supported way to use celery signals such as to allow your client to receive these signals.
    • Events are the appropriate means to monitor the execution of tasks, as they are sent via the broker (rabbitmq) and thus can be processed by any connected client. This is the same technique employed by celery events.

    Since the 0.13.7 release the omega|ml streams (minibatch) feature allows to trace and process celery tasks easily:

    # setup
    source = mb.contrib.celery.CeleryEventSource(om.runtime.celeryapp)
    streaming = om.streams.getl('mytasks', source=source)
    
    # process
    @streaming
    def tracker(window):
        # window.data contains the celery task status and event details
        import omegaml as om 
        om.logger.info(f'received {window.data}')
    

    The tracker function can contain arbitrary code. If multiple windows are ready to process, the function will be called in parallel (or as configured in by the streaming parameters -- see omegaml minibatch for details).  

    This code can be embedded in a Jupyter notebook, e.g. mytasks.ipynb, and run like this:

    $ om runtime script mytasks.ipynb --async
    

    With this, every task's information run by omegaml worker will be sent to the tracker streaming function, contained in window.data . It is possible to process each event separately or by batching by time or window size (e.g. every 5 seconds, every 10 messages etc.)

  • Greetings!

    Thank you for the response!

    We are currently using the Omegaml 0.13.5.

    Are there any options to handle events and get info from omegaml tasks performed using this version?


    Best regards,

    Daria

  • edited April 2021

    Hello!


    I also want to ask - How can I get a callback for task-succeeded using the approach mentioned by you above?

    I mean - only for this event.


    Thank you in advance!


    Best Regards,

    Daria

  • The list of events to filter on can be specified as a tuple:

    CeleryEventSource(om.runtime.celeryapp, events=tuple('task-succeeded',))
    

    The default events are listed in CeleryEventSource.default_events :

    [] from minibatch.contrib.celery import CeleryEventSource
       help(CeleryEventSource)
    
    class CeleryEventSource(builtins.object)
     |  A CeleryEventSource
     |  
     |  This implements a Celery event listener that forwards task
     |  information to a minibatch stream
    ...
     |  default_events = ('task-succeeded', 'task-failed')
    


  • edited April 2021

    Hi!

    I have done some steps and got an error. Namely:

    1. I have updated the OmegaML version to the last one: 0.13.7
    2. Installed: minibatch==0.4.0
    3. Inside my Django application in the file where I call om.runtime.fit(), I added these lines:
    # setup
    source = mb.contrib.celery.CeleryEventSource(om.runtime.celeryapp)
    streaming = om.streams.getl('mytasks', source=source)
    
    # process
    @streaming
    def tracker(window):
        # window.data contains the celery task status and event details
        import omegaml as om 
        om.logger.info(f'received {window.data}')
    

    While trying to run the server I see as it is waiting and can not start properly. After a while I get such an error:

    AttributeError: 'State' object has no attribute '__reduce_args__'

    The whole traceback may be observed in the file:


    Should I do some additional settings?


    I also find HERE that:

    Future releases will include a streaming worker built into the omega|ml native runtime. The syntax will be something like this:

    # this will start the streaming consumer on the runtime
    om.streams.put(streaming_function, 'myconsumer')
    

    I suppose it may simplify the streaming setup. When are you going to make the next release?


    Best regards, Daria

Sign In or Register to comment.