通过示例的帮助,此博客文章将指导您如何使用Azure 数据资源管理器 Go SDK从 Azure Blob 存储容器中收集数据,并使用 SDK 以编程方式查询数据。在快速概述如何设置 Azure 数据资源管理器群集(和数据库)后,我们将探索代码以了解发生了什么(以及如何),最后使用简单的 CLI 接口测试应用程序

示例数据是可从这里下载的CSV 文件

该代码可在GitHub https://github.com/abhirockzz/azure-dataexplorer-go

什么是 Azure 数据资源管理器?

Azure 数据资源管理器(也称为Kusto)是一种快速且可扩展的数据探索服务,用于分析来自任何数据源(如网站、应用程序、IoT 设备等)的大量不同数据。然后,此数据可用于诊断、监视、报告、机器学习和其他分析功能。

支持几种引入方法,包括事件中心等常见服务的连接器、使用 SDK(如.NETPython)的编程引入,以及直接访问引擎以进行探索。它还与分析和建模服务集成,使用诸如

转到 Azure 数据资源管理器的 SDK

Go 客户端 SDK允许您使用 Go 查询、控制和引入 Azure 数据资源管理器群集。请注意,这是用于与 Azure 数据资源管理器群集(以及相关组件(如表等)进行交互。若要创建 Azure 数据资源管理器群集、数据库等,应使用管理组件(控制平面)SDK,这是大型 Azure SDK 的一部分。

API 文档 – https://godoc.org/github.com/Azure/azure-kusto-go

在开始之前,下面是试用示例应用程序所需的

先决条件

安装 Go 1.13 或以上

您将需要一个微软 Azure 帐户。继续注册一个免费的!

如果还没有 Azure CLI,请安装它(应该很快!

设置 Azure 数据资源管理器群集、创建数据库和配置安全性

首先使用 az kusto 群集创建群集。完成操作后,创建一个数据库,

Java

 

 
1
azkusto群集创建-l"中央我们"-nMyadxcluster-gn--Myadxresgrp-skuStandard_D11_v2-容量2 g
3
azkustodatabase数据库创建-群集-名称Myadxcluster-gMyadxresgrp-ngnMyadxdb
4

5
azkusto数据库显示-群集--名称Myadxcluster-名称Myadxdb-资源-Myadxresgrp

使用az ad sp 创建为 rbac 创建服务主体

Java

 

x
1
 
1
azadsp创建-for-rbac-n"测试数据 x - sp"

您将得到 JSON 响应 , 请注意 , appId password tenant 您将在后续步骤中使用它们

Java

 

x
1
 
1
{
2
"appId""fe7280c7-5705-4789-b17f-71a472340429"
3
"显示名称""测试数据 x - sp"
4
"名字""http://test-datax-sp",
“密码” :”29c719dd-f2b3-46de-b71c-4004fb6116ee”

6
"租户""42f988bf -86f1-42af-91ab-2d7cd011db42"
7
}

您需要将角色分配给服务主体,以便它可以访问您刚刚创建的数据库。若要使用 Azure 门户进行导航,请打开 Azure 数据资源管理器群集,导航 Data > Databases 到并选择数据库。选择 Permissions 左菜单,然后单击 Add 以继续。

有关详细信息,请参阅 Azure 中的“安全 Azure 数据资源管理器”群集

代码演练

在高级别上,这是示例代码所起的:

  • 连接到 Azure 数据资源管理器群集(当然!
  • 创建一个表(并列出它们只是为了确定)
  • 创建数据映射
  • 从 Azure Blob 存储中的 CSV 文件中输入/加载现有数据
  • 对刚刚摄入的数据运行查询

让我们看看每个步骤

连接到 Azure 数据资源管理器群集

我们使用服务主体对 Azure 数据资源管理器进行身份验证,并提供 Azure 租户 ID、客户端 ID 和客户端密钥(使用 创建主体后获取 az ad sp create-for-rbac )。

Java

 

x
1
1
func创建表(kc_kusto.客户端kustoDB字符串) |
2
\err :\kc.Mgmt上下文.背景库斯托德库斯托NewStmt创建表命令))
3
如果错误! []
4
日志致命"创建表失败"错误
5
  }
6

日志。Printf(”表% s 创建\n”, kustotable)

8
}

您可以在此处查看代码

请注意我们如何使用客户端。Mgmt以执行此操作,因为这是一个 management 查询。稍后,您将了解如何执行查询以从 Azure 数据资源管理器读取数据。

为了确认,我们运行一个查询来检查数据库中的表,即show tables

Java

 

X
1
26
1
func查找表(kc_kusto.客户端kustoDB字符串) [表信息]
2
var=表信息
3
rierr错误:\kc.Mgmt上下文.背景库斯托德库斯托NewStmt测试查询))
4
如果错误! []
5
日志Fatalf"执行查询 %s - %s"testQuery错误失败
7
vart表信息
8
对于
9
错误:\ri.下一个()
10
如果错误! []
11
如果错误\io.Eof
12
中断
13

14
日志Println"错误"错误
15
  }
16
  }
17
结构和/t
18
=追加t
19
  }
20
返回
21

22
...
23
键入Tableinfo结构|
24
名称字符串"库斯托"表名"'
25
DB字符串"库斯托:"数据库名称"'`
26
}

您可以在此处查看代码

执行查询后 ToStruct ,用于将结果保存到用户定义的结构 TableInfo 的实例

创建表后,我们可以配置在引入过程中使用的数据映射,以将传入数据映射到 Kusto 表中的列

Java

 

func Csvfromblob (kc _ kusto.客户端、 blobstore 帐户名称、 blobstoreContainer 、 blobstoretoken 、 blobstore 文件名、 kustomapingrefname 、 kustodb 、 kustotable 字符串 ) |
谢谢, 错误: = 摄入。新(kc, 库斯托德布, 库斯托表)
如果错误! [零]
日志。致命("未能创建引入客户端",错误)
}
BlobStorePath :\ fmt.Sprintf(blobStorePathFormat, blobStore 帐户名称, blobstoreContainer, blobStoreFilename, blobstoreToken)
错误 = ingest. fromfile (上下文。背景(),blobStorePath,摄入。文件格式(正在摄入。CSV),摄入。引入映射引用 (kustomappingRefname, 摄入。Csv))

如果错误! [零]
日志。致命("无法提交文件",错误)
}
日志。Println("从 -",BlobStorePath 的"包含的文件"
[数据-lang="文本/x-java"}

x
1
15
 
1
constblobStorePathFormat="https://%s.blob. core. windows. net/%s/%s% s"

3
funcCsvfromblob(kc_kusto.客户端blobstore 帐户名称blobstoreContainerblobstoretokenblobstore 文件名kustomapingrefnamekustodbkustotable字符串) |
4
最错误 =摄入kckustodbkustoTable
5
如果错误! []
6
日志致命"未能创建摄入客户端"错误
7
  }
8
Sprintf(blobStorePathFormatblobStoreAccountNamesblobStoreContainerblobStoreFilenameblobStoreToken

9
错误=克林斯特FromFile上下文背景blobStore路径摄入文件格式摄入.CSV摄入.引入映射Ref(kustomapingref引入.CSV))
10

11
如果错误! []
12
日志致命"无法摄入文件"错误
13
  }
日志。Println"从 -" 的"从 -"删除文件

15
}

您可以在此处查看代码

我们在 Azure Blob 存储中具有文件的路径,我们在函数中引用该文件类型 FromFileCSV 本例中)以及我们刚刚创建的数据映射 StormEvents_CSV_Mapping ()

查询数据

我们使用以下查询从 StormEvents 表中获取一些数据:

Java

 

x
1

1
风暴事件|其中事件类型="洪水" 和状态State= "华盛顿"=sort排序由伤害财产dec|项目开始时间结束时间源损坏属性

这一次, client.Query 我们使用( Mgmt 不是 )从表中读取数据。

Java

 

x
1
28

1
func获取(kc_kusto.客户端kustoDBstring字符串 ) [风暴细节]
2
var事件=风暴细节
3
rierr错误:\kc.查询上下文)。背景库斯托德库斯托NewStmt查询))
4

5
如果错误! []
6
Fatalf("执行查询 %s - %s",查询错误

7
  }
8
对于
9
错误:\ri.下一个()
10
如果错误! []
11
如果错误\io.Eof
12
中断
13

14
日志Println"错误"错误
15
  }
16
  }
17
var事件风暴细节
18
ToStruct事件
19
事件=追加事件事件
20
  }
返回事件

22
...
23
类型风暴尾结构|
24
开始时间时间'库斯托"开始时间"'
25
结束时间时间'库斯托"结束时间"'
26
字符串 '库斯托'来源''
27
伤害int32'库斯托 :"损坏财产"'
}

结果中的每一行都转换为 StormDetail 结构,使用ToStruct

您可以在此处查看代码

然后 StormDetail ,s 列表以用户友好的表格格式显示为粗壮

Java

 

x
1
14

1
...
2
数据 [字符串]
3
对于[详细信息[范围详细信息]
4
数据=追加数据, +字符串+详细信息开始字符串详细信息结束字符串详细信息strconv.伊托(int详细信息) 。伤害)])
5
}
6
Println"风暴事件数据..."

7
=写表器新作家(os.Stdout
8
设置头 (+字符串="开始时间""结束时间""从""损坏"=)
9

10
对于_v: [范围数据]
11
追加v
12
}
13
渲染()

14
...

您可以在此处查看代码

最后,要放下桌子,我们使用.drop table StormEvents

Java

 

X
1
 
康斯特滴表 Q] = ". drop 表风暴事件"

2

3
func滴表(kc_kusto.客户端) |
4
\err :\kc.Mgmt上下文.背景库斯托德库斯托NewStmt滴表Q))
5
如果错误! []
6
日志致命"未能掉落表 - "错误
7

8
}

您可以在此处查看代码

运行示例

现在,您已经了解了发生了什么,让我们使用 CLI 试用一下

设置所需的环境变量

Java

 

x
1

1
导出AZURE_SP_CLIENT_ID="服务主体客户端 ID"
2
导出AZURE_SP_CLIENT_SECRET="<服务主要客户机密>"
3
导出AZURE_SP_TENANT_ID="<租户 ID>"
4
#e.ghttps//mykusto.southeastasia.kusto.windows.net
5
导出KUSTO_ENDPOINT="https://<群集名称><azure 区域>.kusto.windows.net"

获取代码并生成它

Java