If you begin a workflow from the terminal, you’ll be able to simply see which steps are being executed and what logs have been recorded for that step.
Additionally, to allow human interplay, user_feedback = enter()in your workflow. It will pause the workflow and look ahead to person enter (see this official Llamaindex human-in-the-loop instance). NotesNonetheless, to attain the identical performance in a user-friendly interface, further adjustments should be made to the unique workflow.
Workflows can take a very long time to run, so to enhance the person expertise, Llamaindex offered a strategy to emit streaming occasions that point out the progress of the workflow, as proven within the pocket book. hereIn my workflow, WorkflowStreamingEvent A category that incorporates helpful details about the occasion message, corresponding to the kind of occasion and the step at which the occasion was despatched:
class WorkflowStreamingEvent(BaseModel):
event_type: Literal["server_message", "request_user_input"] = Area(
..., description="Kind of the occasion"
)
event_sender: str = Area(
..., description="Sender (workflow step identify) of the occasion"
)
event_content: Dict[str, Any] = Area(..., description="Content material of the occasion")
To allow sending streaming occasions, a workflow step will need to have entry to a shared context, which is @step(pass_context=True) Add a decorator to your step definition, which might ship occasion messages about its progress via the context. For instance: tavily_query() Steps:
@step(pass_context=True)
async def tavily_query(self, ctx: Context, ev: StartEvent) -> TavilyResultsEvent:
ctx.information["research_topic"] = ev.user_query
question = f"arxiv papers in regards to the state-of-the-art of {ev.user_query}"
ctx.write_event_to_stream(
Occasion(
msg=WorkflowStreamingEvent(
event_type="server_message",
event_sender=examine.currentframe().f_code.co_name,
event_content={"message": f"Querying Tavily with: '{question}'"},
).model_dump()
)
)
On this instance, event_type is “server_message” That is an replace message, that means no person motion is required. There are different sorts of occasions too "request_user_input" Signifies that person enter is required. For instance, gather_feedback_outline() The workflow step generates a textual content define for the slides from the unique paper summary after which sends a message to the person requesting approval and suggestions on the define textual content.
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
"""Current person the unique paper abstract and the outlines generated, collect suggestions from person"""
...# Ship a particular occasion indicating that person enter is required
ctx.write_event_to_stream(
Occasion(
msg=json.dumps(
{
"event_type": "request_user_input",
"event_sender": examine.currentframe().f_code.co_name,
"event_content": {
"abstract": ev.abstract,
"define": ev.define.dict(),
"message": "Do you approve this define? If not, please present suggestions.",
},
}
)
)
)
...
These occasions are dealt with in another way within the backend API and within the frontend logic, that are defined in additional element in later sections of this text.
When sending "request_user_input" After notifying the person of the occasion, you simply want to maneuver on to the following step. rear Person enter has been acquired. As proven within the workflow diagram above, outlines_with_layout()If the person approves the define, or summary2outline() If the person doesn’t approve, carry out the process once more.
that is, Future() Python Objects asyncio library. SlideGenerationWorkflow Units an attribute on a category self.user_input_future = asyncio.Future() Can wait gather_feedback_outline() Steps, the place the additional execution of the workflow is determined by the person suggestions.
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
...# Look ahead to person enter
if not self.user_input_future.performed():
user_response = await self.user_input_future
logger.information(f"gather_feedback_outline: Obtained person response: {user_response}")
# Course of user_response, which needs to be a JSON string
attempt:
response_data = json.hundreds(user_response)
approval = response_data.get("approval", "").decrease().strip()
suggestions = response_data.get("suggestions", "").strip()
besides json.JSONDecodeError:
# Deal with invalid JSON
logger.error("Invalid person response format")
increase Exception("Invalid person response format")
if approval == ":materials/thumb_up:":
return OutlineOkEvent(abstract=ev.abstract, define=ev.define)
else:
return OutlineFeedbackEvent(
abstract=ev.abstract, define=ev.define, suggestions=suggestions
)
Use fastAPI to arrange your backend, expose a POST endpoint to course of requests, and begin executing your workflow. run_workflow_endpoint() It takes ResearchTopic As enter. Contained in the operate, an asynchronous generator event_generator() is outlined, duties are created to run the workflow, occasions are streamed to the shopper because the workflow progresses, and when the workflow finishes, the ultimate file consequence can be streamed to the shopper.
class ResearchTopic(BaseModel):
question: str = Area(..., instance="instance question")@app.put up("/run-slide-gen")
async def run_workflow_endpoint(subject: ResearchTopic):
workflow_id = str(uuid.uuid4())
wf = SummaryAndSlideGenerationWorkflow(wid=workflow_id, timeout=2000, verbose=True)
wf.add_workflows(
summary_gen_wf=SummaryGenerationWorkflow(
wid=workflow_id, timeout=800, verbose=True
)
)
wf.add_workflows(
slide_gen_wf=SlideGenerationWorkflow(
wid=workflow_id, timeout=1200, verbose=True
)
)
async def event_generator():
loop = asyncio.get_running_loop()
logger.debug(f"event_generator: loop id {id(loop)}")
yield f"{json.dumps({'workflow_id': workflow_id})}nn"
job = asyncio.create_task(wf.run(user_query=subject.question))
logger.debug(f"event_generator: Created job {job}")
attempt:
async for ev in wf.stream_events():
logger.information(f"Sending message to frontend: {ev.msg}")
yield f"{ev.msg}nn"
await asyncio.sleep(0.1) # Small sleep to make sure correct chunking
final_result = await job
# Assemble the obtain URL
download_pptx_url = f"http://backend:80/download_pptx/{workflow_id}"
download_pdf_url = f"http://backend:80/download_pdf/{workflow_id}"
final_result_with_url = {
"consequence": final_result,
"download_pptx_url": download_pptx_url,
"download_pdf_url": download_pdf_url,
}
yield f"{json.dumps({'final_result': final_result_with_url})}nn"
besides Exception as e:
error_message = f"Error in workflow: {str(e)}"
logger.error(error_message)
yield f"{json.dumps({'occasion': 'error', 'message': error_message})}nn"
lastly:
# Clear up
workflows.pop(workflow_id, None)
return StreamingResponse(event_generator(), media_type="textual content/event-stream")
Along with this endpoint, there may be an endpoint that receives person enter from the shopper and processes the file obtain request. Every workflow is assigned a novel workflow ID in order that person enter acquired from the shopper could be mapped to the suitable workflow. set_result() Ready FutureA pending workflow can resume execution.
@app.put up("/submit_user_input")
async def submit_user_input(information: dict = Physique(...)):
workflow_id = information.get("workflow_id")
user_input = information.get("user_input")
wf = workflows.get(workflow_id)
if wf and wf.user_input_future:
loop = wf.user_input_future.get_loop() # Get the loop from the long run
logger.information(f"submit_user_input: wf.user_input_future loop id {id(loop)}")
if not wf.user_input_future.performed():
loop.call_soon_threadsafe(wf.user_input_future.set_result, user_input)
logger.information("submit_user_input: set_result known as")
else:
logger.information("submit_user_input: future already performed")
return {"standing": "enter acquired"}
else:
increase HTTPException(
status_code=404, element="Workflow not discovered or future not initialized"
)
The obtain endpoint additionally identifies the place the ultimate file is positioned based mostly on the workflow ID.
@app.get("/download_pptx/{workflow_id}")
async def download_pptx(workflow_id: str):
file_path = (
Path(settings.WORKFLOW_ARTIFACTS_PATH)
/ "SlideGenerationWorkflow"
/ workflow_id
/ "remaining.pptx"
)
if file_path.exists():
return FileResponse(
path=file_path,
media_type="software/vnd.openxmlformats-officedocument.presentationml.presentation",
filename=f"remaining.pptx",
)
else:
increase HTTPException(status_code=404, element="File not discovered")
On the front-end web page, after a person submits a analysis subject, st.text_input(),A protracted-running course of is began in a background thread in a brand new occasion loop to obtain occasions streamed from the backend, with out interfering with the remainder of the web page.
def start_long_running_task(url, payload, message_queue, user_input_event):
attempt:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
get_stream_data(url, payload, message_queue, user_input_event)
)
loop.shut()
besides Exception as e:
message_queue.put(("error", f"Exception in background thread: {str(e)}"))...
def principal():
...
with st.sidebar:
with st.kind(key="slide_gen_form"):
question = st.text_input(
"Enter the subject of your analysis:",
)
submit_button = st.form_submit_button(label="Submit")
if submit_button:
# Reset the workflow_complete flag for a brand new workflow
st.session_state.workflow_complete = False
# Begin the long-running job in a separate thread
if (
st.session_state.workflow_thread is None
or not st.session_state.workflow_thread.is_alive()
):
st.write("Beginning the background thread...")
st.session_state.workflow_thread = threading.Thread(
goal=start_long_running_task,
args=(
"http://backend:80/run-slide-gen",
{"question": question},
st.session_state.message_queue,
st.session_state.user_input_event,
),
)
st.session_state.workflow_thread.begin()
st.session_state.received_lines = []
else:
st.write("Background thread is already operating.")
Occasion information streamed from the backend is httpx.AsyncClient It’s positioned in a message queue for additional processing. Relying on the kind of occasion, completely different data is extracted. “request_user_input”the thread can be paused till person enter is offered.
async def fetch_streaming_data(url: str, payload: dict = None):
async with httpx.AsyncClient(timeout=1200.0) as shopper:
async with shopper.stream("POST", url=url, json=payload) as response:
async for line in response.aiter_lines():
if line:
yield lineasync def get_stream_data(url, payload, message_queue, user_input_event):
# message_queue.put(("message", "Beginning to fetch streaming information..."))
data_json = None
async for information in fetch_streaming_data(url, payload):
if information:
attempt:
data_json = json.hundreds(information)
if "workflow_id" in data_json:
# Ship workflow_id to principal thread
message_queue.put(("workflow_id", data_json["workflow_id"]))
proceed
elif "final_result" in data_json:
# Ship final_result to principal thread
message_queue.put(("final_result", data_json["final_result"]))
proceed
event_type = data_json.get("event_type")
event_sender = data_json.get("event_sender")
event_content = data_json.get("event_content")
if event_type in ["request_user_input"]:
# Ship the message to the principle thread
message_queue.put(("user_input_required", data_json))
# Wait till person enter is offered
user_input_event.wait()
user_input_event.clear()
proceed
else:
# Ship the road to the principle thread
message_queue.put(("message", format_workflow_info(data_json)))
besides json.JSONDecodeError: # todo: is that this mandatory?
message_queue.put(("message", information))
if data_json and "final_result" in data_json or "final_result" in str(information):
break # Cease processing after receiving the ultimate consequence
The message is st.session_state and, st.expander() View and replace this streamed information.
if st.session_state.received_lines:
with expander_placeholder.container():
# Create or replace the expander with the most recent truncated line
expander = st.expander(st.session_state.expander_label)
for line in st.session_state.received_lines:
expander.write(line)
expander.divider()
To make sure that the UI stays responsive and occasion messages are seen whereas they’re being processed in a background thread, a personalized Automatic Updates A part that refreshes the web page at a set interval:
if not st.session_state.workflow_complete:
st_autorefresh(interval=2000, restrict=None, key="data_refresh")
The kind of occasion streamed is “request_user_input”show related data in one other container and gather suggestions from the person. Since there could be a number of occasions requiring person enter in a single workflow run, put them in a message queue and assign them a novel key. st.suggestions(), st.text_area() and st.button() The widgets are linked to one another’s occasions in order that they don’t intrude with one another.
def gather_outline_feedback(placeholder):
container = placeholder.container()
with container:
if st.session_state.user_input_required:
information = st.session_state.user_input_prompt
event_type = information.get("event_type")
if event_type == "request_user_input":
abstract = information.get("event_content").get("abstract")
define = information.get("event_content").get("define")
prompt_message = information.get("event_content").get(
"message", "Please evaluate the define."
)# show the content material for person enter
st.markdown("## Unique Abstract:")
st.text_area("Abstract", abstract, disabled=True, top=400)
st.divider()
st.markdown("## Generated Slide Define:")
st.json(define)
st.write(prompt_message)
# Outline distinctive keys for widgets
current_prompt = st.session_state.prompt_counter
approval_key = f"approval_state_{current_prompt}"
feedback_key = f"user_feedback_{current_prompt}"
# Show the approval suggestions widget
approval = st.suggestions("thumbs", key=approval_key)
st.write(f"Present Approval state is: {approval}")
logging.information(f"Present Approval state is: {approval}")
# Show the suggestions textual content space
suggestions = st.text_area(
"Please present suggestions when you've got any:", key=feedback_key
)
# Deal with the submission of person response
if st.button(
"Submit Suggestions", key=f"submit_response_{current_prompt}"
):
if not st.session_state.user_response_submitted:
# Retrieve approval and suggestions utilizing distinctive keys
approval_state = st.session_state.get(approval_key)
user_feedback = st.session_state.get(feedback_key, "")
# Guarantee approval_state is legitimate
if approval_state not in [0, 1]:
st.error("Please choose an approval possibility.")
return
user_response = {
"approval": (
":materials/thumb_down:"
if approval_state == 0
else ":materials/thumb_up:"
),
"suggestions": user_feedback,
}
# Ship the person's response to the backend
attempt:
response = requests.put up(
"http://backend:80/submit_user_input",
json={
"workflow_id": st.session_state.workflow_id,
"user_input": json.dumps(user_response),
},
)
response.raise_for_status()
logging.information(
f"Backend response for submitting approval: {response.status_code}"
)
besides requests.RequestException as e:
st.error(f"Did not submit person enter: {str(e)}")
return
...
Lastly when the workflow finishes executing, the front-end shopper receives a response containing paths to the ultimate generated information (each in PDF format for rendering within the UI and the identical slide deck in pptx format for downloading as the ultimate consequence). We create buttons to view the PDF file and to obtain the pptx file.
if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:
download_url_pdf = st.session_state.download_url_pdf
attempt:
# Fetch the PDF content material
pdf_response = requests.get(download_url_pdf)
pdf_response.raise_for_status()
st.session_state.pdf_data = pdf_response.content materialst.markdown("### Generated Slide Deck:")
# Show the PDF utilizing an iframe
st.markdown(
f'<iframe src="information:software/pdf;base64,{base64.b64encode(st.session_state.pdf_data).decode()}" width="100%" top="600px" sort="software/pdf"></iframe>',
unsafe_allow_html=True,
)
besides Exception as e:
st.error(f"Did not load the PDF file: {str(e)}")
# Present the obtain button for PPTX if accessible
if (
"download_url_pptx" in st.session_state
and st.session_state.download_url_pptx
):
download_url_pptx = st.session_state.download_url_pptx
attempt:
# Fetch the PPTX content material
pptx_response = requests.get(download_url_pptx)
pptx_response.raise_for_status()
pptx_data = pptx_response.content material
st.download_button(
label="Obtain Generated PPTX",
information=pptx_data,
file_name="generated_slides.pptx",
mime="software/vnd.openxmlformats-officedocument.presentationml.presentation",
)
besides Exception as e:
st.error(f"Did not load the PPTX file: {str(e)}")
Create a multi-service Docker software. docker-compose Run the front-end and back-end apps.
model: '3.8'companies:
backend:
construct:
context: ./backend
args:
- --no-cache
ports:
- "8000:80"
networks:
- app-network
volumes:
- .env:/app/.env
- ./information:/app/information
- ./workflow_artifacts:/app/workflow_artifacts
- ~/.azure:/root/.azure
frontend:
construct:
context: ./frontend
args:
- --no-cache
ports:
- "8501:8501"
networks:
- app-network
networks:
app-network:
That is it! Simply run! docker-compose upYou now have an app that may execute a analysis workflow based mostly on a person’s enter question, request suggestions from the person through the execution, and show the ultimate outcomes to the person.

