免責聲明
本文中使用的股票数据完全是虚构的。它纯粹用于演示目的。请不要将这些数据用于做出任何财务决策。
本文将展示如何将 Kafka 代理(流式处理示例股票报价数据)连接到 SingleStoreDB。然后,我们将通过 LangChain 使用英语句子查询数据,它为即时报价数据提供了基本的问答功能。我们将构建一个Python应用程序,通过几次设计迭代,使用 OpenAI的Whisper 通过语音提问,并使用语音合成来回答。
笔记本文件、SQL 和 Python 代码可在 GitHub 上找到。
介绍
使用自然语言询问有关数据库系统的问题的能力是熟悉的。然而,使用LangChain和OpenAI的Whisper等现代工具实现它变得更加容易。在本文中,我们将看到如何操作。
创建单店云账号
上一篇文章介绍了创建免费SingleStoreDB云帐户的步骤。我们将使用以下设置:
- 工作区组名称: 耳语演示组
- 云提供商: 自主技术
- 地区: 美国东部 1(弗吉尼亚北部)
- 工作区名称: 耳语演示
- 大小: S-00
- 设置: 未选择任何
工作区可用后,我们将记下 密码 和 主机。主机可从 Whisper 演示组>概述>工作区> 耳语演示>直接连接> SQL IDE >主机>提供。稍后,我们将在 Python 应用程序中需要此信息。我们将通过在“ 耳语演示组>防火墙”下配置防火墙来暂时允许从任何地方访问。
创建数据库和表
在左侧导航窗格中,我们将选择 “开发”> SQL 编辑器 以创建 timeseries_db
数据库和 tick
stock_sentiment
表,如下所示:
CREATE DATABASE IF NOT EXISTS timeseries_db;
USE timeseries_db;
CREATE TABLE IF NOT EXISTS tick (
symbol VARCHAR(10),
ts DATETIME SERIES TIMESTAMP,
open NUMERIC(18, 2),
high NUMERIC(18, 2),
low NUMERIC(18, 2),
price NUMERIC(18, 2),
volume INT,
KEY(ts)
);
CREATE TABLE IF NOT EXISTS stock_sentiment (
headline VARCHAR(250),
positive FLOAT,
negative FLOAT,
neutral FLOAT,
url TEXT,
publisher VARCHAR(30),
ts DATETIME,
symbol VARCHAR(10)
);
创建管道
管道允许我们使用单个命令从各种来源(例如 Kafka、S3 和 HDFS)创建流式摄取源 对于我们的用例,我们将在 SingleStoreDB 中创建一个简单的管道,如下所示:
CREATE PIPELINE tick
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/stockticker'
BATCH_INTERVAL 2500
INTO TABLE tick
FIELDS TERMINATED BY ','
(symbol,ts,open,high,low,price,volume);
我们将使用 BATCH_INTERVAL
.最初,我们将此值设置为 2500 毫秒。
我们将管道配置为从最早的偏移量开始,如下所示:
ALTER PIPELINE tick SET OFFSETS EARLIEST;
我们将在开始运行管道之前对其进行测试,如下所示:
TEST PIPELINE tick LIMIT 1;
输出应类似于以下内容:
+--------+---------------------+-------+-------+-------+-------+--------+
| symbol | ts | open | high | low | price | volume |
+--------+---------------------+-------+-------+-------+-------+--------+
| AIV | 2023-09-05 06:47:53 | 44.89 | 44.89 | 44.88 | 44.89 | 719 |
+--------+---------------------+-------+-------+-------+-------+--------+
请记住,这是虚构的数据
这些数据派生自 GitHub Gist ,是 Kaggle 数据的一小部分。 CSV 文件已包含情绪值。我们将下载此文件,然后使用 MySQL 客户端连接到 SingleStoreDB 云,如下所示:
mysql --local-infile -u admin -h <host> -P 3306 --default-auth=mysql_native_password -p
<host>
替换为我们之前从SingleStoreDB Cloud获得的值。我们将从 CSV 文件将数据加载到 SingleStoreDB 中,如下所示:
USE timeseries_db;
LOAD DATA LOCAL INFILE '/path/to/stock_sentiment.csv'
INTO TABLE stock_sentiment
IGNORE 1 LINES
COLUMNS TERMINATED BY '\t';
替换为 /path/to/
CSV 文件所在的实际路径。
加载笔记本
我们将使用 GitHub 上提供的笔记本文件。在 SingleStoreDB Cloud 的左侧导航窗格中,我们将选择 “笔记本”。网页的右上角将是 “新建笔记本 ”,其下拉列表有两个选项:
- 新笔记本
- 从文件导入
我们将选择第二个选项,找到我们从GitHub下载的笔记本文件并将其加载到SingleStoreDB Cloud中。
运行笔记本
我们将首先使用笔记本上方的下拉菜单选择 连接 (耳语演示)和 数据库 (timeseries_db)。
分析时间序列数据
笔记本的第 1 部分包含对表中数据的 tick
一组时序操作com/articles/using-singlestore-as-a-time-series-database“>上一篇文章。
LangChain OnlinePDFLoader
笔记本的第 2 部分将 PDF 文档的内容和矢量嵌入加载到名为 fintech_docs
的表中。这些操作在 上一篇文章中有更详细的描述。替换为 <PDF document URL>
您选择的金融科技文件的超链接:
from langchain.document_loaders import OnlinePDFLoader
loader = OnlinePDFLoader("<PDF document URL>")
data = loader.load()
必须通过选择右上角的 “编辑防火墙 ”选项将 PDF 文件所在的完全限定域名 (FQDN) 添加到防火墙中。
我们将使用 ChatGPT 来回答有关 PDF 文件的问题,例如:
"What are the best investment opportunities in Blockchain?"
LangChain SQL 代理
笔记本的第 3 部分包含对 和 tick
stock_sentiment
表中数据的 LangChain 代理操作。这将是本文的主要重点。
首先,我们将安装一些库:
!pip install langchain --quiet
!pip install openai --quiet
现在,我们将输入我们的 OpenAI API Key
:
agent_executor.run(
"""
From the tick table, which stock symbol saw the least volatility in share trading in the dataset?
"""
)
下面是一些示例输出:
'The stock symbol with the least volatility in share trading in the dataset is FTR
run(query)’ data-lang=“text/x-python”>
query = input("Please enter your question:")
agent_executor.run(query)
我们将使用示例查询对此进行测试:
Using the symbol A, what is the most positive sentiment in the stock_sentiment table and the current best price for this symbol from the tick table?
下面是一些示例输出:
'The most positive sentiment for symbol A is 0.9576 and the current best price is 46.43.'
要查看链输出,我们将设置为 verbose
True
,如下所示:
agent_executor = create_sql_agent(
llm = OpenAI(temperature = 0), toolkit = toolkit, verbose = True
)
上一篇文章中显示了另一个数据库的链输出示例。
奖励:构建可视化 Python 应用程序
我们现在将构建一个简单的Python应用程序,该应用程序使用OpenAI的Whisper来询问有关数据库系统的问题。一篇 出色的文章 启发了此应用程序。
安装所需的软件
在本文中,这是在 VMware Fusion 虚拟机中运行的 Ubuntu 22.04.2 全新安装所需的软件:
langchain
matplotlib
openai
openai-whisper
pyaudio
pymysql
pyttsx3 wave
这些可以在 GitHub 上的文件中找到 requirements.txt
。按如下方式运行该文件:
pip install -r requirements.txt
openai-whisper
安装可能需要一段时间。
我们需要在我们的环境中提供一个 OpenAI API Key
。例如:
export OPENAI_API_KEY="<OpenAI API Key>"
替换为 <OpenAI API Key>
您的密钥。
在以下每个应用程序中,我们都有以下代码:
s2_password = "<password>"
s2_host = "<host>"
我们将 <password>
用之前从 SingleStoreDB Cloud 保存的值替换 和 <host>
。
第一次迭代
让我们从一个简单的使用 Tkinter的可视化Python应用程序开始。我们还将使用OpenAI的Whisper添加语音识别。该应用程序最多可以录制 20 秒的语音。它可以按如下方式运行:
图1.第一次迭代。
第二次迭代
在下一次迭代中,我们将添加一个音频波形。我们将按如下方式运行该程序:
python3 record-transcribe-visualise.py
示例输出如图 2 所示。
图2.第二次迭代。
第三次迭代
在第三次也是最后一次迭代中,我们将删除应用程序基于文本的响应,并将其替换为语音合成。我们将按如下方式运行该程序:
python3 record-transcribe-visualise-speak.py
示例输出如图 3 所示。
图3.第三次迭代。
在所有三次迭代的代码中,我们可以调用 OpenAI Whisper API,而不是使用本地 Whisper 安装。为此,我们需要在函数中 transcribe_audio
取消注释这些代码行:
# transcript = openai.Audio.transcribe(
# model = "whisper-1",
# file = audio_file,
# language = "en"
# )
并注释掉这行代码:
transcript = model.transcribe(filename)
但是,调用OpenAI Whisper API会产生额外的成本。本地的耳语安装已经提供了出色的结果。
总结
在本文中,我们已经看到,在不创建向量嵌入的情况下,我们已经能够使用 LangChain 非常有效地访问我们的数据。但是,我们的查询需要更加集中,并且在提出问题之前我们需要了解数据库架构。集成语音功能使我们的应用程序能够得到更广泛的应用。