A lot of the machine studying service tutorials deal with real-time synchronous companies that may reply to prediction requests immediately. Nonetheless, this strategy will not be appropriate for long-running duties as a result of it’s tough to deal with visitors spikes, requires extra highly effective machines to reply shortly, and prediction outcomes are sometimes misplaced if the shopper or server fails.
On this weblog submit, we show tips on how to use Celery and Redis to run machine studying fashions as asynchronous employees. We use the Florence 2 base mannequin, a Imaginative and prescient language mannequin recognized for its glorious efficiency. The tutorial gives a minimal but practical instance which you could adapt and lengthen in your personal use case.
You may see a demo of the app right here: https://caption-app-dfmj3maizq-ew.a.run.app/
The core of our resolution relies on Celery, a Python library that implements this shopper/employee logic, permitting you to distribute the computational work throughout many employees and enhance scalability of ML inference use circumstances for prime and unpredictable hundreds.
The method works as follows:
- A shopper submits a process, together with some parameters, to a queue managed by a dealer (Redis on this instance).
- A employee (or a number of employees) repeatedly screens the queue and picks up duties as they arrive. It then executes the duties and shops the ends in backend storage.
- A shopper can use that ID to get the results of the duty by polling the backend or by subscribing to the duty’s channel.
Let’s begin with a easy instance:
First, run Redis:
docker run -p 6379:6379 redis
Right here is the employee code:
from celery import Celery
# Configure Celery to make use of Redis because the dealer and backend
app = Celery(
"duties", dealer="redis://localhost:6379/0", backend="redis://localhost:6379/0"
)
# Outline a easy process
@app.process
def add(x, y):
return x + y
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info"])
Shopper Code:
from celery import Celery
app = Celery("duties", dealer="redis://localhost:6379/0", backend="redis://localhost:6379/0")
print(f"{app.management.examine().lively()=}")
task_name = "duties.add"
add = app.signature(task_name)
print("Gotten Job")
# Ship a process to the employee
consequence = add.delay(4, 6)
print("Ready for Job")
consequence.wait()
# Get the consequence
print(f"Consequence: {consequence.consequence}")
This returns the anticipated consequence: “Consequence: 10”
Now let’s transfer on to the precise use case: the Florence 2 service.
You’ll construct a multi-container picture captioning software that makes use of Redis for process queuing, Celery for process distribution, and probably native volumes or Google Cloud Storage for picture storage. The appliance is designed with just a few core parts: mannequin inference, process distribution, shopper interplay, and file storage.
Excessive-level structure:
- shopper: It begins by sending a picture caption request (by way of the Dealer) to the Employee.
- employee: It receives requests, downloads pictures, performs inference utilizing a pre-trained mannequin, and returns the outcomes.
- Women: It acts as a message dealer to facilitate communication between purchasers and employees.
- File Storage: Briefly save picture information
Elements breakdown:
1. Mannequin inference (mannequin.py):
- Dependencies and initialization:
import os
from io import BytesIO
import requests
from google.cloud import storage
from loguru import logger
from modeling_florence2 import Florence2ForConditionalGeneration
from PIL import Picture
from processing_florence2 import Florence2Processor
mannequin = Florence2ForConditionalGeneration.from_pretrained(
"microsoft/Florence-2-base-ft"
)
processor = Florence2Processor.from_pretrained("microsoft/Florence-2-base-ft")
- Import the libraries required for picture processing, net requests, working with Google Cloud Storage, and logging.
- Initialize the pre-trained Florence-2 mannequin and processor for picture caption era.
- Obtain picture (download_image):
def download_image(url):
if url.startswith("http://") or url.startswith("https://"):
# Deal with HTTP/HTTPS URLs
# ... (code to obtain picture from URL) ...
elif url.startswith("gs://"):
# Deal with Google Cloud Storage paths
# ... (code to obtain picture from GCS) ...
else:
# Deal with native file paths
# ... (code to open picture from native path) ...
- Downloads a picture from the required URL.
- HTTP/HTTPS URL, Google Cloud Storage path (
gs://), and native file path. - Run inference (run_inference):
def run_inference(url, task_prompt):
# ... (code to obtain picture utilizing download_image perform) ...
attempt:
# ... (code to open and course of the picture) ...
inputs = processor(textual content=task_prompt, pictures=picture, return_tensors="pt")
besides ValueError:
# ... (error dealing with) ...
# ... (code to generate captions utilizing the mannequin) ...
generated_ids = mannequin.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
# ... (mannequin era parameters) ...
)
# ... (code to decode generated captions) ...
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
# ... (code to post-process generated captions) ...
parsed_answer = processor.post_process_generation(
generated_text, process=task_prompt, image_size=(picture.width, picture.peak)
)
return parsed_answer
Modify your picture captioning course of.
- To obtain a picture
download_image. - Put together a picture of the mannequin and a process immediate.
- Generates captions utilizing the loaded Florence-2 mannequin.
- Decode and post-process the generated captions.
- Returns the ultimate caption.
2. Job distribution (employee.py):
import os
from celery import Celery
# ... different imports ...
# Get Redis URL from atmosphere variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to make use of Redis because the dealer and backend
app = Celery("duties", dealer=REDIS_URL, backend=REDIS_URL)
# ... (Celery configurations) ...
- Configure Celery to make use of Redis as a message dealer for process distribution.
- Job definition (inference_task):
@app.process(bind=True, max_retries=3)
def inference_task(self, url, task_prompt):
# ... (logging and error dealing with) ...
return run_inference(url, task_prompt)
- Outline
inference_taskThat is executed by Celery employees. - This process:
run_inferenceFrom performmannequin.py. - Run the employee:
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info", "--pool=solo"])
- It begins a Celery employee that listens for and executes duties.
3. Shopper interplay (shopper.py):
import os
from celery import Celery
# Get Redis URL from atmosphere variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to make use of Redis because the dealer and backend
app = Celery("duties", dealer=REDIS_URL, backend=REDIS_URL)
- Set up a connection to Celery utilizing Redis as a message dealer.
- Ship process (send_inference_task):
def send_inference_task(url, task_prompt):
process = inference_task.delay(url, task_prompt)
print(f"Job despatched with ID: {process.id}")
# Look ahead to the consequence
consequence = process.get(timeout=120)
return consequence
- Submit a picture caption process (
inference_task) to the Celery employee. - Look ahead to the employee to finish the duty and retrieve the consequence.
Docker integration (docker-compose.yml):
- Use Docker Compose to outline your multi-container setup.
- Women: Runs a Redis server for the message dealer.
- mannequin: Construct and deploy the mannequin inference employee.
- Apps: Construct and deploy the shopper software.
- flower: Run the web-based Celery process monitoring software.
You may run the complete stack utilizing:
docker-compose up
That is it! You have seen a complete information to constructing an asynchronous machine studying inference system utilizing Celery, Redis, and Florence 2. On this tutorial, we have proven you tips on how to successfully use Celery for process distribution, Redis for message brokering, and Florence 2 for picture captioning. By adopting an asynchronous workflow, you possibly can deal with massive volumes of requests, enhance efficiency, and improve the general resilience of your ML inference software. With the offered Docker Compose setup, you possibly can run the complete system your self with a single command.
Are you prepared for the following step? Deploying this structure to the cloud can include its personal challenges, so tell us within the feedback if you would like to see a follow-up article on cloud deployment.
code: https://github.com/CVxTz/celery_ml_deploy
demo: https://caption-app-dfmj3maizq-ew.a.run.app/

