免責聲明

本文中使用的股票数据完全是虚构的。它纯粹用于演示目的。请不要将这些数据用于做出任何财务决策。

本文将展示如何将 Kafka 代理(流式处理示例股票报价数据)连接到 SingleStoreDB。然后,我们将通过 LangChain 使用英语句子查询数据,它为即时报价数据提供了基本的问答功能。我们将构建一个Python应用程序,通过几次设计迭代,使用 OpenAI的Whisper 通过语音提问,并使用语音合成来回答。

笔记本文件、SQL 和 Python 代码可在 GitHub 上找到。

介绍

使用自然语言询问有关数据库系统的问题的能力是熟悉的。然而,使用LangChain和OpenAI的Whisper等现代工具实现它变得更加容易。在本文中,我们将看到如何操作。

创建单店云账号

上一篇文章介绍了创建免费SingleStoreDB云帐户的步骤。我们将使用以下设置:

  • 工作区组名称: 耳语演示组
  • 云提供商: 自主技术
  • 地区: 美国东部 1(弗吉尼亚北部)
  • 工作区名称: 耳语演示
  • 大小: S-00
  • 设置: 未选择任何

工作区可用后,我们将记下 密码主机。主机可从 Whisper 演示组>概述>工作区> 耳语演示>直接连接> SQL IDE >主机>提供。稍后,我们将在 Python 应用程序中需要此信息。我们将通过在“ 耳语演示组>防火墙”下配置防火墙来暂时允许从任何地方访问。

创建数据库和表

在左侧导航窗格中,我们将选择 “开发”> SQL 编辑器 以创建 timeseries_db 数据库和 tick stock_sentiment 表,如下所示:

.SQL

 

CREATE DATABASE IF NOT EXISTS timeseries_db;

USE timeseries_db;

CREATE TABLE IF NOT EXISTS tick (
  symbol VARCHAR(10),
  ts DATETIME SERIES TIMESTAMP,
  open NUMERIC(18, 2),
  high NUMERIC(18, 2),
  low NUMERIC(18, 2),
  price NUMERIC(18, 2),
  volume INT,
  KEY(ts)
);

CREATE TABLE IF NOT EXISTS stock_sentiment (
  headline VARCHAR(250),
  positive FLOAT,
  negative FLOAT,
  neutral FLOAT,
  url TEXT,
  publisher VARCHAR(30),
  ts DATETIME,
  symbol VARCHAR(10)
);

创建管道

管道允许我们使用单个命令从各种来源(例如 Kafka、S3 和 HDFS)创建流式摄取源 对于我们的用例,我们将在 SingleStoreDB 中创建一个简单的管道,如下所示:

.SQL

 

CREATE PIPELINE tick
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/stockticker'
BATCH_INTERVAL 2500
INTO TABLE tick
FIELDS TERMINATED BY ','
(symbol,ts,open,high,low,price,volume);

我们将使用 BATCH_INTERVAL.最初,我们将此值设置为 2500 毫秒。

我们将管道配置为从最早的偏移量开始,如下所示:

.SQL

 

ALTER PIPELINE tick SET OFFSETS EARLIEST;

我们将在开始运行管道之前对其进行测试,如下所示:

.SQL

 

TEST PIPELINE tick LIMIT 1;

输出应类似于以下内容:

纯文本

 

+--------+---------------------+-------+-------+-------+-------+--------+
| symbol | ts                  | open  | high  | low   | price | volume |
+--------+---------------------+-------+-------+-------+-------+--------+
| AIV    | 2023-09-05 06:47:53 | 44.89 | 44.89 | 44.88 | 44.89 | 719    |
+--------+---------------------+-------+-------+-------+-------+--------+

请记住,这是虚构的数据

这些数据派生自 GitHub Gist ,是 Kaggle 数据的一小部分。 CSV 文件已包含情绪值。我们将下载此文件,然后使用 MySQL 客户端连接到 SingleStoreDB 云,如下所示:

 

mysql --local-infile -u admin -h <host> -P 3306 --default-auth=mysql_native_password -p

<host>替换为我们之前从SingleStoreDB Cloud获得的值。我们将从 CSV 文件将数据加载到 SingleStoreDB 中,如下所示:

.SQL

 

USE timeseries_db;

LOAD DATA LOCAL INFILE '/path/to/stock_sentiment.csv'
INTO TABLE stock_sentiment
IGNORE 1 LINES
COLUMNS TERMINATED BY '\t';

替换为 /path/to/ CSV 文件所在的实际路径。

加载笔记本

我们将使用 GitHub 上提供的笔记本文件。在 SingleStoreDB Cloud 的左侧导航窗格中,我们将选择 “笔记本”。网页的右上角将是 “新建笔记本 ”,其下拉列表有两个选项:

  1. 新笔记本
  2. 从文件导入

我们将选择第二个选项,找到我们从GitHub下载的笔记本文件并将其加载到SingleStoreDB Cloud中。

运行笔记本

我们将首先使用笔记本上方的下拉菜单选择 连接耳语演示)和 数据库timeseries_db)。

分析时间序列数据

笔记本的第 1 部分包含对表中数据的 tick 一组时序操作com/articles/using-singlestore-as-a-time-series-database“>上一篇文章。

LangChain OnlinePDFLoader

笔记本的第 2 部分将 PDF 文档的内容和矢量嵌入加载到名为 fintech_docs的表中。这些操作在 上一篇文章中有更详细的描述。替换为 <PDF document URL> 您选择的金融科技文件的超链接:

 

from langchain.document_loaders import OnlinePDFLoader

loader = OnlinePDFLoader("<PDF document URL>")

data = loader.load()

必须通过选择右上角的 “编辑防火墙 ”选项将 PDF 文件所在的完全限定域名 (FQDN) 添加到防火墙中。

我们将使用 ChatGPT 来回答有关 PDF 文件的问题,例如:

纯文本

 

"What are the best investment opportunities in Blockchain?"

LangChain SQL 代理

笔记本的第 3 部分包含对 和 tick stock_sentiment 表中数据的 LangChain 代理操作。这将是本文的主要重点。

首先,我们将安装一些库:

 

!pip install langchain --quiet
!pip install openai --quiet

现在,我们将输入我们的 OpenAI API Key

 

agent_executor.run(
"""
    From the tick table, which stock symbol saw the least volatility in share trading in the dataset?
"""
)

下面是一些示例输出:

纯文本

 

'The stock symbol with the least volatility in share trading in the dataset is FTR

run(query)’ data-lang=“text/x-python”>

query = input("Please enter your question:")
agent_executor.run(query)

我们将使用示例查询对此进行测试:

纯文本

 

Using the symbol A, what is the most positive sentiment in the stock_sentiment table and the current best price for this symbol from the tick table?

下面是一些示例输出:

纯文本

 

'The most positive sentiment for symbol A is 0.9576 and the current best price is 46.43.'

要查看链输出,我们将设置为 verbose True,如下所示:

 

agent_executor = create_sql_agent(
  llm = OpenAI(temperature = 0), toolkit = toolkit, verbose = True
)

上一篇文章中显示了另一个数据库的链输出示例。

奖励:构建可视化 Python 应用程序

我们现在将构建一个简单的Python应用程序,该应用程序使用OpenAI的Whisper来询问有关数据库系统的问题。一篇 出色的文章 启发了此应用程序。

安装所需的软件

在本文中,这是在 VMware Fusion 虚拟机中运行的 Ubuntu 22.04.2 全新安装所需的软件:

 

纯文本

 

langchain
matplotlib
openai
openai-whisper
pyaudio
pymysql
pyttsx3 wave

这些可以在 GitHub 上的文件中找到 requirements.txt 。按如下方式运行该文件:

 

pip install -r requirements.txt

openai-whisper 安装可能需要一段时间。

我们需要在我们的环境中提供一个 OpenAI API Key 。例如:

 

export OPENAI_API_KEY="<OpenAI API Key>" 

替换为 <OpenAI API Key> 您的密钥。

在以下每个应用程序中,我们都有以下代码:

 

s2_password = "<password>"
s2_host = "<host>"

我们将 <password> 用之前从 SingleStoreDB Cloud 保存的值替换 和 <host>

第一次迭代

让我们从一个简单的使用 Tkinter的可视化Python应用程序开始。我们还将使用OpenAI的Whisper添加语音识别。该应用程序最多可以录制 20 秒的语音。它可以按如下方式运行:

 

图1.第一次迭代。

第二次迭代

在下一次迭代中,我们将添加一个音频波形。我们将按如下方式运行该程序:

 

python3 record-transcribe-visualise.py

示例输出如图 2 所示。

图2.第二次迭代。

第三次迭代

在第三次也是最后一次迭代中,我们将删除应用程序基于文本的响应,并将其替换为语音合成。我们将按如下方式运行该程序:

 

python3 record-transcribe-visualise-speak.py

示例输出如图 3 所示。

图3.第三次迭代。

在所有三次迭代的代码中,我们可以调用 OpenAI Whisper API,而不是使用本地 Whisper 安装。为此,我们需要在函数中 transcribe_audio 取消注释这些代码行:

 

# transcript = openai.Audio.transcribe(
#     model = "whisper-1",
#     file = audio_file,
#     language = "en"
# )

并注释掉这行代码:

 

transcript = model.transcribe(filename)

但是,调用OpenAI Whisper API会产生额外的成本。本地的耳语安装已经提供了出色的结果。

总结

在本文中,我们已经看到,在不创建向量嵌入的情况下,我们已经能够使用 LangChain 非常有效地访问我们的数据。但是,我们的查询需要更加集中,并且在提出问题之前我们需要了解数据库架构。集成语音功能使我们的应用程序能够得到更广泛的应用。

Comments are closed.