想象一下,您有一个由人工智能驱动的个人警报聊天助手,可以使用最新数据进行交互。无论是影响您投资的股票市场的重大变动、共享 SharePoint 文档的任何重大变化,还是您等待的 Amazon 折扣,该应用程序都旨在让您随时了解情况,并根据情况提醒您任何重大变化。您使用自然语言提前设置的标准。
在这篇文章中,我们将学习如何使用非常酷的工具在 Python 中构建全栈事件驱动的天气警报聊天应用程序:Streamlit,NATS 和 OpenAI。该应用程序可以收集实时天气信息,使用人工智能了解您的警报标准,并将这些警报发送到用户界面。
这段内容和代码示例对于那些热爱技术的人或开发人员来说非常有帮助,可以帮助他们了解现代实时警报系统如何与大型语言模型 (LLM) 配合使用如何实现一个。
您还可以快速跳转到我们GitHub 并亲自尝试一下。
<图>
<源data-testid =“og”尺寸=“(最小分辨率:4dppx)和(最大宽度:700px)50vw,(-webkit-min-device-pixel-ratio:4)和(最大宽度:700px) )50vw,(最小分辨率:3dppx)和(最大宽度:700px)67vw,(-webkit-min-device-pixel-ratio:3)和(最大宽度:700px)65vw,(最小分辨率:2.5 dppx)和(最大宽度:700px)80vw,(-webkit-min-device-pixel-ratio:2.5)和(最大宽度:700px)80vw,(最小分辨率:2dppx)和(最大宽度:700px ) 100vw, (-webkit-min-device-pixel-ratio: 2) 和 (最大宽度: 700px) 100vw, 700px" srcset="https://miro.medium.com/v2/resize:fit:640/ 1*tIUWgkN0WyV3XIYO-KZV9w.gif 640w,https://miro.medium.com/v2/resize:fit:720/1*tIUWgkN0WyV3XIYO-KZV9w.gif 720w,https://miro.medium.com/v2/resize:适合:750/1*tIUWgkN0WyV3XIYO-KZV9w.gif 750w,https://miro.medium.com/v2/resize:适合:786/1*tIUWgkN0WyV3XIYO-KZV9w.gif 786w,https://miro.medium.com/ v2/resize:fit:828/1*tIUWgkN0WyV3XIYO-KZV9w.gif 828w,https://miro.medium.com/v2/resize:fit:1100/1*tIUWgkN0WyV3XIYO-KZV9w.gif 1100w,https://miro. medium.com/v2/resize:适合:1400/1*tIUWgkN0WyV3XIYO-KZV9w.gif 1400w"/>
图>
幕后力量
让我们仔细看看人工智能天气警报聊天应用程序如何工作,并将原始数据转换为可操作的警报,让您领先天气一步。我们应用程序的核心是用 Python 实现的响应式后端,由 NATS 提供支持,以确保实时数据处理和消息管理。集成 OpenAI 的 GPT 模型使对话式 AI 栩栩如生,能够理解警报的性质并响应用户查询。用户可以用自然语言指定警报标准,然后 GPT 模型将对其进行解释。
<图>
<源data-testid =“og”尺寸=“(最小分辨率:4dppx)和(最大宽度:700px)50vw,(-webkit-min-device-pixel-ratio:4)和(最大宽度:700px) )50vw,(最小分辨率:3dppx)和(最大宽度:700px)67vw,(-webkit-min-device-pixel-ratio:3)和(最大宽度:700px)65vw,(最小分辨率:2.5 dppx)和(最大宽度:700px)80vw,(-webkit-min-device-pixel-ratio:2.5)和(最大宽度:700px)80vw,(最小分辨率:2dppx)和(最大宽度:700px ) 100vw, (-webkit-min-device-pixel-ratio: 2) 和 (最大宽度: 700px) 100vw, 700px" srcset="https://miro.medium.com/v2/resize:fit:640/ 0*2HurMmD5KNNu4MWS.png 640w,https://miro.medium.com/v2/resize:fit:720/0*2HurMmD5KNNu4MWS.png 720w,https://miro.medium.com/v2/resize:fit:750/ 0*2HurMmD5KNNu4MWS.png 750w,https://miro.medium.com/v2/resize:fit:786/0*2HurMmD5KNNu4MWS.png 786w,https://miro.medium.com/v2/resize:fit:828/ 0*2HurMmD5KNNu4MWS.png 828w,https://miro.medium.com/v2/resize:fit:1100/0*2HurMmD5KNNu4MWS.png 1100w,https://miro.medium.com/v2/resize:fit:1400/ 0*2HurMmD5KNNu4MWS.png 1400w"/>
图>
图 1:实时警报应用架构
实时数据收集
旅程开始于从后端的各种来源连续异步收集天气数据。我们的应用程序现在使用 api.weatherapi.com
服务,每 10 秒获取一次实时天气信息。这些数据包括温度、湿度、降水量等,覆盖全球各地。此代码片段异步获取爱沙尼亚当前的天气数据,但可以改进该应用程序以动态设置用户输入的位置:
异步 def fetch_weather_data():
api_url = f"http://api.weatherapi.com/v1/current.json?key={weather_api_key}&q=爱沙尼亚"
尝试:
与 aiohttp.ClientSession() 异步作为会话:
与 session.get(api_url) 异步作为响应:
如果响应.status == 200:
返回等待response.json()
别的:
logging.error(f"获取天气数据时出错:HTTP {response.status}")
返回无
除了异常 e:
logging.error(f"获取天气数据时出错:{e}")
返回无
NATS 在数据流中的作用
nats.py 库将 NATS 集成到 Python 中代码。首先,我们建立与 Docker 位于 nats://localhost:4222
。
nats_client = wait nats.connect("nats://localhost:4222")
然后,我们定义一个异步 message_handler
函数,用于订阅和处理从 NAT 服务器接收到的 chat
主题的消息。如果消息以“设置警报:”开头(我们将其附加在前端),它将提取并更新用户的警报条件。
异步 def message_handler(msg):
非本地 user_alert_criteria
数据 = msg.data.decode()
if data.startswith("设置警报:"):
user_alert_criteria = data[len("设置警报:"):].strip()
logging.info(f"用户警报条件已更新:{user_alert_criteria}")
等待 nats_client.subscribe("聊天", cb=message_handler)
后端服务与 Weather API 和 开放AI聊天完成API。如果天气数据和用户警报标准都存在,则应用程序会为 OpenAI 的 GPT 模型构建提示,以确定天气是否满足用户的标准。该提示要求人工智能根据用户的标准分析当前天气,并回答“是”或“否”以及简短的天气摘要。一旦 AI 确定传入的天气数据符合用户的警报标准,它就会制作个性化警报消息并向 NATS 服务器上的 chat_response
主题发布天气警报,以使用最新更改更新前端应用程序。此消息包含旨在通知和建议用户的用户友好通知。例如,它可能会说:“注意!爱沙尼亚明天预计会下雨。别忘了带伞!”
当 True 时:
当前天气 = 等待 fetch_weather_data()
如果 current_weather 和 user_alert_criteria:
logging.info(f"当前天气数据:{current_weather}")
提示 = f“使用当前天气:{current_weather} 信息和用户警报标准:{user_alert_criteria}。确定天气是否满足这些标准,并仅返回 YES 或 NO 以及简短的天气温度信息,而不解释原因。”
response_text = 等待 get_openai_response(提示)
如果response_text和response_text中的“YES”:
logging.info(“天气条件符合用户标准。”)
ai_response = f"天气警报!您指定的条件已满足。{response_text}"
等待 nats_client.publish("chat_response", Payload=ai_response.encode())
别的:
logging.info(“天气条件不符合用户标准。”)
别的:
logging.info("没有设置当前天气数据或用户警报条件。")await asyncio.sleep(10)
实时发送和接收警报
让我们了解一下后端和前端之间的整体通信流程。
- 通过使用 Streamlit 构建的简单聊天界面(请参阅frontend.py 文件),用户使用自然语言输入天气警报标准并提交。
alert_criteria = st.text_input("设置天气警报条件", key="alert_criteria",disabled=st.session_state['alert_set'])< /前>
- 下面,Streamlit 前端代码通过 NATS 消息传递与后端服务交互。它将这些标准发布到
聊天
主题上的 NATS 服务器。
def send_message_to_nats_handler(消息):
使用 NATSClient() 作为客户端:
客户端.connect()
client.publish("聊天", Payload=message.encode())
client.subscribe("chat_response",callback=read_message_from_nats_handler)
客户端.wait()
如果设置_alert_btn:
st.session_state['alert_set'] = True
st.success('警报条件设置')
send_message_to_nats_handler(f"设置警报:{alert_criteria}")
- 正如我们在上一节中看到的,后端监听
聊天
主题,接收条件,获取当前天气数据,并使用 AI 来确定是否应触发警报。如果满足条件,后端会向chat_response
主题发送一条警报消息。前端收到此消息并更新 UI 以通知用户。
def read_message_from_nats_handler(msg):
消息 = msg.payload.decode()
st.session_state['conversation'].append(("AI", message))
st.markdown(f" AI: {message}", unsafe_allow_html=True)
尝试一下
要详细探索实时天气警报聊天应用程序并亲自尝试,请访问前面链接的 GitHub 存储库。该存储库包含所有必要的代码、详细的设置说明以及可帮助您入门的附加文档。设置完成后,您可以启动 Streamlit 前端和 Python 后端。设置天气警报标准,并查看系统如何处理实时天气数据以使您了解情况。
图片 2:提醒应用的 Streamlit UI
<图>
<源数据-testid ="og" 尺寸="(最小分辨率:4dppx)和(最大宽度:700px)50vw,(-webkit-min-device-pixel-ratio:4)和(最大宽度:700px)50vw,(最小-分辨率:3dppx)和(最大宽度:700px)67vw,(-webkit-min-device-pixel-ratio:3)和(最大宽度:700px)65vw,(最小分辨率:2.5dppx)和(最大-宽度:700px)80vw,(-webkit-最小设备像素比:2.5)和(最大宽度:700px)80vw,(最小分辨率:2dppx)和(最大宽度:700px)100vw,(- webkit-min-device-pixel-ratio: 2) 和 (最大宽度: 700px) 100vw, 700px" srcset="https://miro.medium.com/v2/resize:fit:640/0*PJFV0RFS9ttz2wj8.png 640w,https://miro.medium.com/v2/resize:fit:720/0*PJFV0RFS9ttz2wj8.png 720w,https://miro.medium.com/v2/resize:fit:750/0*PJFV0RFS9ttz2wj8.png 750w,https://miro.medium.com/v2/resize:fit:786/0*PJFV0RFS9ttz2wj8.png 786w,https://miro.medium.com/v2/resize:fit:828/0*PJFV0RFS9ttz2wj8.png 828w,https://miro.medium.com/v2/resize:fit:1100/0*PJFV0RFS9ttz2wj8.png 1100w,https://miro.medium.com/v2/resize:fit:1400/0*PJFV0RFS9ttz2wj8.png 1400w"/>构建流处理管道
图>
实时天气警报聊天应用程序展示了 NATS 在分布式系统中进行实时消息传递的强大用例,允许面向用户的前端和数据处理之间进行高效通信后端。但是,您应该考虑几个关键步骤,以确保向用户提供的信息相关、准确且可操作。在应用程序中,我们只是获取实时原始天气数据并将其直接发送到 OpenAI 或前端。有时,您需要在数据到达外部服务之前对其进行实时转换以过滤、丰富、聚合或标准化。您开始考虑创建一个具有多个阶段的流处理管道。
例如,并非所有从 API 获取的数据都与每个用户相关,您可以在初始阶段过滤掉不必要的信息。此外,数据可以采用多种格式,特别是当您从多个 API 获取信息以进行全面警报并且需要标准化此数据时。在下一阶段,您可以使用原始数据的额外上下文或信息来丰富数据,以使其更有用。这可能包括将当前天气状况与历史数据进行比较,以识别异常模式或使用另一个外部 API 添加基于位置的见解,例如针对特定区域天气状况的具体建议。在稍后阶段,您可以汇总每小时温度数据,以给出白天平均温度或突出显示白天达到的峰值温度。
后续步骤
当涉及到在生产环境中转换数据、部署、运行和扩展应用程序时,您可能希望使用 Python 中的专用框架(例如 GlassFlow)来构建复杂的流处理管道。 GlassFlow 为流处理提供完全托管的无服务器基础设施,您无需考虑设置或维护,应用程序可以轻松处理大量数据和用户请求。它提供了先进的状态管理功能,可以更轻松地跟踪用户警报标准和其他应用程序状态。您的应用程序可以根据用户群进行扩展,而不会影响性能。