官网地址
# StarRocks 数据库
# 简介
StarRocks 是开源的新一代极速全场景 MPP 数据库。它采用新一代的弹性 MPP 架构,可以高效支持大数据量级的多维分析、实时分析、 高并发
分析等多种 数据分析
场景。StarRocks 性能出色,它采用了全面 向量化
技术,比 同类产品
平均快 3-5 倍。
MPP:Massively Parallel Processing (大规模并行处理)
# 环境
版本 | 2.4.3 |
系统 | linux x86 |
编程语言 | jdk1.8(大于也行,环境变量已配置) |
# 安装与部署
# 前提条件
- 需要系统支持
avx2
指令,你可以通过以下命令查看系统是否支持
cat /proc/cpuinfo | grep avx2 | |
#如果有返回内容则代表支持,如: | |
[root@localhost ~]# cat /proc/cpuinfo | grep avx2 | |
flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid dca sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch epb cat_l3 cdp_l3 intel_pt ssbd ibrs ibpb stibp tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm cqm rdt_a rdseed adx smap xsaveopt cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local dtherm arat pln pts spec_ctrl intel_stibp flush_l1d |
- 如果系统不支持,可能是 arm 架构的系统。可以自己打包,或者找打包好的运行
# 安装
在官网下载对应系统的安装包。下载
上传到服务后解压
[root@localhost starrocks]# tar -zxvf StarRocks-2.4.3 | |
#解压后的结构 | |
[root@localhost StarRocks-2.4.3]# ll | |
总用量 2028 | |
drwxr-xr-x 5 1007 1007 40 1月 18 2023 apache_hdfs_broker | |
drwxr-xr-x 8 1007 1007 77 5月 12 2023 be | |
drwxr-xr-x 11 1007 1007 126 5月 12 2023 fe | |
-rw-rw-r-- 1 1007 1007 3858 1月 18 2023 LICENSE.txt | |
-rw-r--r-- 1 1007 1007 2071942 1月 18 2023 NOTICE.txt | |
drwxr-xr-x 4 1007 1007 32 1月 18 2023 udf |
路径 / 文件 | 说明 |
---|---|
apache_hdfs_broker | Broker 节点的部署路径。 |
fe | FE 节点的部署路径。 |
be | BE 节点的部署路径。 |
LICENSE.txt | StarRocks license 文件。 |
NOTICE.txt | StarRocks notice 文件。 |
# 部署
- 首先配置 FE 节点
#创建元数据存储路径。建议将元数据存储在与 FE 部署文件不同的路径中。请确保此路径存在并且您拥有写入权限 | |
# 进入 fe 目录 | |
cd fe | |
# 将 <meta_dir> 替换为您要创建的元数据目录。 <meta_dir > 和你配置的目录一致 | |
mkdir -p <meta_dir> | |
# 这里的目录直接就叫 | |
mkdir -p meta | |
# 修改配置 | |
# 在配置项 meta_dir 中指定元数据路径 | |
vi fe/conf/fe.conf | |
# 将 <meta_dir> 替换为您已创建的元数据目录。 | |
meta_dir = <meta_dir> | |
# 上面指定目录维 meta, 那么这里的目录为下面的 | |
meta_dir = meta |
fe.conf
更多高级配置项请参考 参数配置 - FE 配置项
- 启动 FE 节点
#后台启动 | |
./fe/bin/start_fe.sh --daemon |
- 确定 FE 节点成功启动
#通过查看指定日志内容确定启动情况 | |
cat fe/log/fe.log | grep thrift | |
#如果日志打印以下内容,则说明该 FE 节点启动成功: | |
"2022-08-10 16:12:29,911 INFO (UNKNOWN x.x.x.x_9010_1660119137253(-1)|1) [FeServer.start():52] thrift server started with port 9020." |
- 使用 mysql 连接工具连接 FE 节点确定状态
#通过 MySQL 客户端连接到 StarRocks。您需要使用初始用户 root 登录,密码默认为空 | |
# 将 <fe_address> 替换为 Leader FE 节点的 IP 地址(priority_networks)或 FQDN, | |
# 并将 <query_port>(默认:9030)替换为您在 fe.conf 中指定的 query_port。 | |
mysql -h <fe_address> -P<query_port> -uroot | |
例如:mysql -h 192.168.0.1 -P9030 -uroot | |
#可以用 navicat 或 sqlyago 工具连接 | |
#连接后查看状态 | |
SHOW PROC '/frontends'\G | |
#示例: | |
MySQL [(none)]> SHOW PROC '/frontends'\G | |
*************************** 1. row *************************** | |
Name: x.x.x.x_9010_1686810741121 | |
IP: x.x.x.x | |
EditLogPort: 9010 | |
HttpPort: 8030 | |
QueryPort: 9030 | |
RpcPort: 9020 | |
Role: LEADER | |
ClusterId: 919351034 | |
Join: true | |
Alive: true | |
ReplayedJournalId: 1220 | |
LastHeartbeat: 2023-06-15 15:39:04 | |
IsHelper: true | |
ErrMsg: | |
StartTime: 2023-06-15 14:32:28 | |
Version: 3.0.0-48f4d81 | |
1 row in set (0.01 sec) | |
如果字段 Alive 为 true,说明该 FE 节点正常启动并加入集群。 | |
如果字段 Role 为 FOLLOWER,说明该 FE 节点有资格被选为 Leader FE 节点。 | |
如果字段 Role 为 LEADER,说明该 FE 节点为 Leader FE 节点。 |
- 配置 BE 节点
#创建数据存储路径。建议将数据存储在与 BE 部署文件不同的路径中。请确保此路径存在并且您拥有写入权限 | |
# 进入 be 目录 | |
cd be | |
# 将 <storage_root_path> 替换为您要创建的数据存储路径。 <storage_root_path > 和你配置的目录一致 | |
mkdir -p <storage_root_path> | |
# 这里的目录直接就叫 | |
mkdir -p storage | |
# 修改配置 | |
# 在配置项 storage_root_path 中指定元数据路径 | |
vi be/conf/be.conf | |
# 将 <storage_root_path> 替换为您已创建的元数据目录。 | |
storage_root_path = <storage_root_path> | |
# 上面指定目录为 storage, 那么这里的目录为下面的 | |
storage_root_path = storage |
be.conf
更多高级配置项请参考 参数配置 - BE 配置项
- 启动 BE 节点
./be/bin/start_be.sh --daemon |
- 确定 FE 节点成功启动
#通过查看指定日志内容确定启动情况 | |
cat be/log/be.INFO | grep heartbeat | |
#如果日志打印以下内容,则说明该 BE 节点启动成功: | |
"I0614 17:41:39.782819 3717531 thrift_server.cpp:388] heartbeat has started listening port on 9050" |
- 将 BE 节点添加到 FE 中,查看 BE 状态
#通过 MySQL 客户端连接到 StarRocks FE 节点。您需要使用初始用户 root 登录,密码默认为空 | |
# 将 <fe_address> 替换为 Leader FE 节点的 IP 地址(priority_networks)或 FQDN, | |
# 并将 <query_port>(默认:9030)替换为您在 fe.conf 中指定的 query_port。 | |
mysql -h <fe_address> -P<query_port> -uroot | |
#可以用 navicat 或 sqlyago 工具连接 | |
#将 BE 节点添加到 FE | |
-- 将 <be_address> 替换为 BE 节点的 IP 地址(priority_networks), | |
-- 并将 <heartbeat_service_port>(默认:9050)替换为您在 be.conf 中指定的 heartbeat_service_port。 | |
ALTER SYSTEM ADD BACKEND "<be_address>:<heartbeat_service_port>", "<be2_address>:<heartbeat_service_port>", "<be3_address>:<heartbeat_service_port>"; | |
例如:ALTER SYSTEM ADD BACKEND "192.168.0.1:9050"; | |
#添加成功后 查看添加结果 | |
SHOW PROC '/backends'\G | |
MySQL [(none)]> SHOW PROC '/backends'\G | |
*************************** 1. row *************************** | |
BackendId: 10007 | |
IP: 172.26.195.67 | |
HeartbeatPort: 9050 | |
BePort: 9060 | |
HttpPort: 8040 | |
BrpcPort: 8060 | |
LastStartTime: 2023-06-15 15:23:08 | |
LastHeartbeat: 2023-06-15 15:57:30 | |
Alive: true | |
SystemDecommissioned: false | |
ClusterDecommissioned: false | |
TabletNum: 30 | |
DataUsedCapacity: 0.000 | |
AvailCapacity: 341.965 GB | |
TotalCapacity: 1.968 TB | |
UsedPct: 83.04 % | |
MaxDiskUsedPct: 83.04 % | |
ErrMsg: | |
Version: 3.0.0-48f4d81 | |
Status: {"lastSuccessReportTabletsTime":"2023-06-15 15:57:08"} | |
DataTotalCapacity: 341.965 GB | |
DataUsedPct: 0.00 % | |
CpuCores: 16 | |
NumRunningQueries: 0 | |
MemUsedPct: 0.01 % | |
CpuUsedPct: 0.0 % | |
如果字段 Alive 为 true,说明该 BE 节点正常启动并加入集群。 |
# 使用
- 基本的使用和 mysql 的语法差不多,可能只有少部分不兼容
- 数据基本是外部导入产生的
# 表设计
# 表概述
表是数据存储单元。理解 StarRocks 中的表结构,以及如何设计合理的表结构,有利于优化数据组织,提高查询效率。相比于传统的数据库,StarRocks 会以列的方式存储 JSON、ARRAY 等复杂的半结构化数据,保证高效查询。
与其他关系型数据库一样,StarRocks 表在逻辑上由行(Row)和列(Column)构成
建表只需要在 CREATE TABLE 语句中定义列和列的数据类型,即可创建一张表
CREATE DATABASE example_db;
USE example_db;
CREATE TABLE user_access (
uid int,
name varchar(64),
age int,
phone varchar(16),
last_access datetime,
credits double
)
DUPLICATE KEY(uid, name);
-- 上述建表示例创建了明细表,该表中数据不具有任何约束,相同的数据行可能会重复存在。并且指定明细表中前两列为排序列,构成排序键。数据按排序键排序后存储,有助于查询时的快速索引
-- 可以执行 DESCRIBE 查看表结构
MySQL [example_db]> DESCRIBE user_access;
+-------------+-------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+-------------+------+-------+---------+-------+
| uid | int | YES | true | NULL | |
| name | varchar(64) | YES | true | NULL | |
| age | int | YES | false | NULL | |
| phone | varchar(16) | YES | false | NULL | |
| last_access | datetime | YES | false | NULL | |
| credits | double | YES | false | NULL | |
+-------------+-------------+------+-------+---------+-------+
6 rows in set (0.00 sec)
-- 执行 SHOW CREATE TABLE 来查看建表语句 mysql 也是根据这个语句来查看建表语句
MySQL [example_db]> SHOW CREATE TABLE user_access\G
*************************** 1. row ***************************
Table: user_access
Create Table: CREATE TABLEuser_access
(uid
int(11) NULL COMMENT "",name
varchar(64) NULL COMMENT "",age
int(11) NULL COMMENT "",phone
varchar(16) NULL COMMENT "",last_access
datetime NULL COMMENT "",credits
double NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(uid
,name
)
DISTRIBUTED BY RANDOM
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
);
1 row in set (0.00 sec)
如果测试环境中集群仅包含一个 BE,可以在 PROPERTIES
中将副本数设置为 1
,即 PROPERTIES( "replication_num" = "1" )
。默认副本数为 3
,也是生产集群推荐的副本数。如果您需要使用默认设置,也可以不配置 replication_num
参数。
表类型
StarRocks 提供四种类型的表,包括明细表、主键表、聚合表和更新表,适用于存储多种业务数据,例如原始数据、实时频繁更新的数据和聚合数据。
- 明细表简单易用,表中数据不具有任何约束,相同的数据行可以重复存在。该表适用于存储不需要约束和预聚合的原始数据,例如日志等。
- 主键表能力强大,具有唯一性非空约束。该表能够在支撑实时更新、部分列更新等场景的同时,保证查询性能,适用于实时查询(一般用在实时报表)。
- 聚合表适用于存储预聚合后的数据,可以降低聚合查询时所需扫描和计算的数据量,极大提高聚合查询的效率。
- 更新表适用于实时更新的业务场景,目前已逐渐被主键表取代。
数据分布
StarRocks 采用分区 + 分桶的两级数据分布策略,将数据均匀分布各个 BE 节点。查询时能够有效裁剪数据扫描量,最大限度地利用集群的并发性能,从而提升查询性能。
-- 通过 show 相关命令可以查询到分区、分桶情况
-- 分区的情况
SHOW TABLET FROM [<db_name>.]<table_name> PARTITION(partition_name)
-- 分桶的情况
SHOW TABLET <tablet_id>
分区:
相当于把数据分成多个区间,一般是根据时间范围分成多个区间(比如一个星期分一次区间), 或者是特定的值列表分区(比如特定的城市分区)
分桶:
在一个分区中在分一层,将数据存储在更小的单元中
StarRocks 提供两种分桶方式:
- 哈希分桶:根据数据的分桶键值,将数据划分至分桶。选择查询时经常使用的条件列组成分桶键,能有效提高查询效率。
- 随机分桶:随机划分数据至分桶。这种分桶方式更加简单易用。
# 表对比
starrocks 的表提供了特定的几个类型。下面是各类表的能力对比
主键表 (Primary Key table) | 明细表 (Duplicate Key table) | 聚合表 (Aggregate table) | 更新表 (Unique Key table) | |
---|---|---|---|---|
Key 列和唯一约束 | 主键(Primary Key)具有唯一约束和非空约束。 | Duplicate Key 不具有唯一约束。 | 聚合键(Aggregate Key)具有唯一约束。 | 唯一键(Unique Key)具有唯一约束。 |
Key 列和数据变更的关系(逻辑关系) | 基于主键对数据进行增删改操作。如果新数据的主键值与表中原数据的主键值相同,则存在唯一约束冲突,此时新数据会替代原数据。与更新表相比,主键表增强了其底层存储引擎,已经可以取代更新表。 | Duplicate Key 不具有唯一约束,因此如果新数据的 Duplicate Key 与表中原数据相同,则新旧数据都会存在表中。 | 基于聚合键对数据进行增删改操作。如果新数据与表中原数据存在唯一约束冲突,则会根据聚合键和 Value 列的聚合函数聚合新旧数据。 | 基于唯一键对数据进行增删改操作。如果新数据与表中原数据存在唯一约束冲突,则新数据会替代原数据。更新表实际可以视为聚合函数为 replace 的聚合表。 |
Key 列和排序键的关系 | 自 3.0 起,两者解耦。 | 两者耦合。 | ||
Key 列和排序键支持的数据类型 | 数值(包括整型、布尔)、字符串、时间日期。 | 数值(包括整型、布尔、Decimal)、字符串、时间日期。 | ||
Key 和分区 / 分桶列的关系 | 分区列、分桶列必须在主键中。 | 无 | 分区列、分桶列必须在聚合键中。 | 分区列、分桶列必须在唯一键中。 |
# 主键表
主键表能够在支撑实时数据更新的同时,也能保证高效的查询性能
场景:
- 实时对接数据,数据是唯一的,并且数据的更新和删除操作多。实时同步增删改的数据至主键表,可以简化数据同步流程
- 有实时统计的报表,保证数据是最新的统计结果
使用说明
建表例子:
CREATE TABLE orders1 (
order_id bigint NOT NULL,
dt date NOT NULL,
user_id INT NOT NULL,
good_id INT NOT NULL,
cnt int NOT NULL,
revenue int NOT NULL
)
PRIMARY KEY (order_id)
DISTRIBUTED BY HASH (order_id);
由于主键表仅支持分桶策略为哈希分桶,因此您还需要通过
DISTRIBUTED BY HASH ()
定义哈希分桶键。
# 明细表
明细表是默认创建的表类型。如果在建表时未指定任何 key,默认创建的是明细表。
创建表时,支持定义排序键。如果查询的过滤条件包含排序键,则 StarRocks 能够快速地过滤数据,提高查询效率。明细表适用于日志数据分析等场景,支持追加新数据,不支持修改历史数据。
场景:
- 分析原始数据,例如原始日志、原始操作记录等
- 查询方式灵活,不需要局限于预聚合的分析方式。
- 导入日志数据或者时序数据,主要特点是旧数据不会更新,只会追加新的数据。
使用说明
例如,需要分析某时间范围的某一类事件的数据,则可以将事件时间(
event_time
)和事件类型(event_type
)作为排序键。在该业务场景下,建表语句如下:
CREATE TABLE IF NOT EXISTS detail (
event_time DATETIME NOT NULL COMMENT "datetime of event",
event_type INT NOT NULL COMMENT "type of event",
user_id INT COMMENT "id of user",
device_code INT COMMENT "device code",
channel INT COMMENT ""
)
DUPLICATE KEY(event_time, event_type)
DISTRIBUTED BY HASH(user_id)
PROPERTIES (
"replication_num" = "3"
);
# 聚合表
建表时,支持定义排序键和指标列,并为指标列指定聚合函数。当多条数据具有相同的排序键时,指标列会进行聚合。在分析统计和汇总数据时,聚合表能够减少查询时所需要处理的数据,提升查询效率。适用于分析统计和汇总数据。
场景:
- 通过分析网站或 APP 的访问流量,统计用户的访问总时长、访问总次数。
- 广告厂商为广告主提供的广告点击总量、展示总量、消费统计等。
- 通过分析电商的全年交易数据,获得指定季度或者月份中,各类消费人群的爆款商品。
上面这些情况不需要知道数据的明细是什么,只关心 max、sum、count 等聚合的结果
创建表
-- 例如需要分析某一段时间内,来自不同城市的用户,访问不同网页的总次数。则可以将网页地址 site_id、日期 date 和城市代码 city_code 作为排序键,将访问次数 pv 作为指标列,并为指标列 pv 指定聚合函数为 SUM。
在该业务场景下,建表语句如下:
CREATE TABLE IF NOT EXISTS example_db.aggregate_tbl (
site_id LARGEINT NOT NULL COMMENT "id of site",
date DATE NOT NULL COMMENT "time of event",
city_code VARCHAR(20) COMMENT "city_code of user",
pv BIGINT SUM DEFAULT "0" COMMENT "total page views"
)
AGGREGATE KEY(site_id, date, city_code)
DISTRIBUTED BY HASH(site_id)
PROPERTIES (
"replication_num" = "3"
);
# 语法注意点
mysql | starrocks | |
---|---|---|
自动生成汇总行 | with rollup | 不支持,需要用 union all 在统计汇总行 |
子查询 | select a, (select b from B) from A; | 不支持,需要想办法拆分条件 |
# 常用的 sql 语句
修改用户密码
ALTER USER 'jack' IDENTIFIED BY '123456';
ALTER USER jack@'172.10.1.10' IDENTIFIED WITH mysql_native_password BY '123456';
# 优化经验
有一张几亿的表查询很慢,当时我的统计表会根据年份筛选,那么这里我就选择根据年份分区,这样查询就会很快
CREATE TABLE gz_balance (
sno int,
years INT,
name string
)
ENGINE=olap
PRIMARY KEY(sno, years)
PARTITION BY RANGE (years) (
START ("2014") END ("2060") EVERY (1)
)
DISTRIBUTED BY HASH(sno, years) BUCKETS 8
PROPERTIES (
"replication_num" = "1"
);
select a.*,dd.distname,2015 nf,4 yf,'全部' lx from
(select ifnull(distno,'01') distno,zts,bqs,sqs,zxl from
(select case when LENGTH('01')<=4 then left(a.linkdistid,6) else a.linkdistid end distno,
count(distinct ifnull(b.acc_set,c.acc_set)) zts,
convert(sum(b.debit_month),DECIMAL) bqs,
convert(sum(c.debit_month),DECIMAL) sqs,
case when sum(c.debit_month)=0 then 0 else convert(sum(b.debit_month)/sum(c.debit_month)*100,DECIMAL) end zxl
from gz_acc_set a left outer join gz_balance1 b on a.acc_set=b.acc_set and b.YEARs=2015 and b.months=4 and b.acc_code like '522%' left outer join gz_balance1 c on a.acc_set=c.acc_set and c.YEARs*12+c.months=2015*12+4-1 and c.acc_code like '522%' where b.debit_month+c.debit_month>=1 and a.dwlx =case when '全部'='全部' then dwlx else '全部' end and a.linkDistId like '01%' group by case when LENGTH('01')<=4 then left(a.linkdistid,6) else a.linkdistid end )a)a,scandist dd where a.distno=dd.distno
从原本的10几秒 缩减到2秒以内
# smt 工具使用
StarRocks Migration Tool 工具是 StarRocks 专门为了从其他关系型数据生成对应到 starrocks 数据库表结构的工具, 支持从 mysql、sqlserver (2014)、oracle 等数据库导出,还可以生产对应的 flinksql
官方下载地址
修改配置文件 config_prod.conf
[db]
host = 127.0.0.1
port = 1433
user = sa
password = 123456
# currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`, `sqlserver`, `tidb`
type = sqlserver
# # only takes effect on `type == hive`.
# # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
# authentication = kerberos
[other]
# number of backends in StarRocks
# be 节点数
be_num = 1
# `decimal_v3` is supported since StarRocks-1.8.1
use_decimal_v3 = true
# directory to save the converted DDL SQL
# 输出的目录
output_dir = shq_bjaccount
# !!!`database` `table` `schema` are case sensitive in `oracle`!!!
[table-rule.1]
# pattern to match databases for setting properties
# !!! database should be a `whole instance(or pdb) name` but not a regex when it comes with an `oracle db` !!!
database = shq_bjaccount
# pattern to match tables for setting properties
table = .*
# `schema` only takes effect on `postgresql` and `oracle` and `sqlserver`
schema = dbo
############################################
### starrocks table configurations
############################################
# # set a column as the partition_key
# partition_key = p_key
# # override the auto-generated partitions
# partitions = START ("2021-01-02") END ("2021-01-04") EVERY (INTERVAL 1 day)
# # only take effect on tables without primary keys or unique indexes
# duplicate_keys=k1,k2
# # override the auto-generated distributed keys
# distributed_by=k1,k2
# # override the auto-generated distributed buckets
bucket_num=8
# # properties.xxxxx: properties used to create tables
# properties.in_memory = false
properties.replication_num = 1
properties.enable_persistent_index = true
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=${sink_jdbc_url}
flink.starrocks.load-url=${sink_load_url}
flink.starrocks.username=${sink_username}
flink.starrocks.password=${sink_password}
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=1000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true
#防止 json 数据过大导致错误
flink.starrocks.sink.properties.ignore_json_size=true
# # used to set the server-id for mysql-cdc jobs instead of using a random server-id
# flink.cdc.server-id = 5000
############################################
### flink-cdc configuration for `tidb`
############################################
# # Only takes effect on TiDB before v4.0.0.
# # TiKV cluster's PD address.
# flink.cdc.pd-addresses = 127.0.0.1:2379
#可以配置 source 端的相关参数
flink.cdc.debezium.snapshot.isolation.mode=read_committed
flink.cdc.debezium.snapshot.lock.timeout.ms=-1
############################################
### flink-cdc plugin configuration for `postgresql`
############################################
# # for `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming
# # refer to https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
# # and https://debezium.io/documentation/reference/postgres-plugins.html
# flink.cdc.decoding.plugin.name = decoderbufs
执行程序生成对应的文件
./starrocks-migrate-tool
# 导入数据
本篇只介绍从 flink
中实时导入数据