ud_bulkload

概述

ud_bulkload是一种用于UnvDB的高速数据加载工具,相比copy命令。最大的优势就是速度,我们可以跳过shared buffer,wal buffer。直接写文件。ud_bulkloaddirect模式就是这种思路来实现的,它还包含了数据恢复功能,即导入失败的话,需要恢复。 ud_bulkload 旨在将大量数据加载到数据库中。您可以选择是否检查数据库约束以及在加载期间忽略多少错误。例如,当您将数据从另一个数据库复制到UnvDB时,您可以跳过性能完整性检查。另一方面,您可以在加载不干净的数据时启用约束检查。

验证安装

testdb=# create extension ud_bulkload;
CREATE EXTENSION

ud_bulkload 参数

下面介绍ud_bulkload主要的参数选项,主要有一下选项:

$ ud_bulkload --help

ud_bulkload is a bulk data loading tool for UNVDB

Usage:
  Dataload: ud_bulkload [dataload options] control_file_path
  Recovery: ud_bulkload -r [-D DATADIR]

Dataload options:
  -i, --input=INPUT         INPUT path or function
  -O, --output=OUTPUT       OUTPUT path or table
  -l, --logfile=LOGFILE     LOGFILE path
  -P, --parse-badfile=*     PARSE_BADFILE path
  -u, --duplicate-badfile=* DUPLICATE_BADFILE path
  -o, --option="key=val"    additional option

Recovery options:
  -r, --recovery            execute recovery
  -D, --uddata=DATADIR      database directory

Connection options:
  -d, --dbname=DBNAME       database to connect
  -h, --host=HOSTNAME       database server host or socket directory
  -p, --port=PORT           database server port
  -U, --username=USERNAME   user name to connect as
  -w, --no-password         never prompt for password
  -W, --password            force password prompt

Generic options:
  -e, --echo                echo queries
  -E, --elevel=LEVEL        set output message level
  --help                    show this help, then exit
  --version                 output version information, then exit

加载选项:

|选项|功能| |—-|—-| |-i INPUT –input=INPUT|数据源的位置,与控制文件里的”INPUT”类似| |-O OUTPUT –output=OUTPUT|数据的输出,比如数据库的某个表,与控制文件的”OUTPUT”类似| |-l LOGFILE –logfile=LOGFILE| 保存加载结果日志文件的路径,与控制文件的”LOGFILE”类似| |-o, –option=”key=val” |其他可用的选项,比如“TYPE=CSV”,”WRITER=PARALLEL”,可以指定多个选项|

连接选项

|选项|功能| |—-|—-| |-d, –dbname=DBNAME|指定需要连接的数据库,如果没有没有指定,默认使用环境变量,如果没有设定,默认使用用户名作为数据库名| |-h, –host=HOSTNAME|指定需要连接的主机地址| |-p, –port=PORT| 指定端口,默认端口是5678|

上面这三个参数通常都可以从环境变量得到,建议设置环境变量,因为ud_bulkload命令内部使用libpqlibpq需要这些环境变量。

通用选项

|选项|功能| |—-|—-| |–help |显示帮助信息| |–version|显示版本号|

控制文件

除了在命令行上指定参数外,还可以在控制文件中指定参数,下面介绍控制文件里的参数。 |选项|功能| |—-|—-| |TYPE = CSV | BINARY | FIXED | FUNCTION |CSV : 从 CSV 格式的文本文件加载 , 默认为CSV。 BINARY | FIXED:从固定的二进制文件加载。 FUNCTION :从函数的结果集中加载。如果使用它,INPUT 必须是调用函数的表达式。|
| WRITER | LOADER = DIRECT | BUFFERED | BINARY | PARALLEL |DIRECT :将数据直接加载到表中。绕过共享缓冲区并跳过 WAL 日志记录,但需要自己的恢复过程。这是默认的,也是原始旧版本的模式。BUFFERED:通过共享缓冲区将数据加载到表中。使用共享缓冲区,写入WAL,并使用原始 WAL 恢复。BINARY :将数据转换为二进制文件,该文件可用作要从中加载的输入文件。创建加载输出二进制文件所需的控制文件样本。此示例文件创建在与二进制文件相同的目录中,其名称为 .ctl。PARALLEL:与“WRITER=DIRECT”和“MULTI_PROCESS=YES”相同。如果指定了 PARALLEL,则忽略MULTI_PROCESS。如果为要加载的数据库配置了密码验证,则必须设置密码文件。| |TRUNCATE = YES | NO|如果YES,则使用 TRUNCATE 命令从目标表中删除所有行。如果NO,什么也不做。默认为NO。您不能同时指定“WRITER=BINARY”和 TRUNCATE。| |CHECK_CONSTRAINTS = YES | NO|指定在加载期间是否检查 CHECK 约束。默认为否。您不能同时指定“WRITER=BINARY”和 CHECK_CONSTRAINTS。| |PARSE_ERRORS = n|在解析、编码检查、编码转换、FILTER 函数、CHECK 约束检查、NOT NULL 检查或数据类型转换期间引发错误的 ingored 元组的数量。无效的输入元组不会加载并记录在 PARSE BADFILE 中。默认值为 0。如果解析错误数等于或多于该值,则提交已加载的数据并且不加载剩余的元组。0 表示不允许错误,-1 和 INFINITE 表示忽略所有错误。| |INPUT|INFILE=path|stdin|function_name|数据源,必须指定,类型不同,它的值也不一样:文件:如果是文件,此处就是路径,可以是相对路径,服务器必须有读文件的权限。stdin:ud_bulkload将从标准输入读取数据。SQL FUNCTION:指定SQL函数,用这个函数返回插入数据,可以是内建的函数,也可以是用户自定义的函数| |LOGFILE=path|日志文件的路径 ,执行过程中会记录状态。| |MULTI_PROCESS=YES|NO| 若设置了此值,会开启多线程模式,并行处理数据导入。若没设置,单线程模式,默认模式是单线程模式。| |VERBOSE=YES|NO|若设置了YES,坏的元组将写入服务器日志,默认是NO。| |SKIP|OFFSET=n|跳过的行数,默认是0,不能跟”TYPE=FUNCTION”同时设置。| |LIMIT|LOAD=n|限制加载的行数,默认是INFINITE,即加载所有数据,这个选项可以与”TYPE=FUNCTION”同时设置。 | |ENCODING = encoding|指定输入数据的编码。检查指定的编码是否有效,如果需要,将输入数据转换为数据库编码。默认情况下,输入数据的编码既不验证也不转换。| |FILTER = [ schema_name. ] function_name [ (argtype, … ) ]|指定过滤函数以转换输入文件中的每一行。只要函数名在数据库中是唯一的,就可以省略 argtype 的定义。如果未指定,则直接将输入数据解析为加载目标表。另请参阅如何编写 FILTER 函数以生成 FILTER 函数。不能同时指定“TYPE=FUNCTION”和 FILTER。此外,CSV 选项中的 FORCE_NOT_NULL 不能与 FILTER 选项一起使用。|

使用方法

初始化数据

testdb=# create table tb_asher (id int,name text);
CREATE TABLE
testdb=# \d
         List of relations
 Schema |   Name   | Type  | Owner 
--------+----------+-------+-------
 public | tb_asher | table | unvdb
(1 row)

testdb=# create extension ud_bulkload; #如果连接指定到单个库时,需要创建扩展以生成 udbulkload.ud_bulkload() 函数
CREATE EXTENSION
testdb=# quit

模拟CSV 文件

$ seq 100000| awk '{print $0"|asher"}' > bulk_asher.txt 
$ more bulk_asher.txt
1|asher
2|asher
3|asher
4|asher
5|asher
...

加载到指定表

将bulk_asher.txt里的数据加载到testdb 库下的 tb_asher表中

$ ud_bulkload -i /home/lihaozhan/bulk/bulk_asher.txt -O tb_asher -l  /home/lihaozhan/bulk/tb_asher_output.log -P /home/lihaozhan/bulk/tb_asher_bad.txt -o "TYPE=CSV" -o "DELIMITER=|" -d testdb -U unvdb -h 127.0.0.1

NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
	0 Rows skipped.
	100000 Rows successfully loaded.
	0 Rows not loaded due to parse errors.
	0 Rows not loaded due to duplicate errors.
	0 Rows replaced with new rows.

查看导入日志

$ cat /home/lihaozhan/bulk/tb_asher_output.log

ud_bulkload 3.1.20 on 2023-09-20 17:53:03.296287+08

INPUT = /home/lihaozhan/bulk/bulk_asher.txt
PARSE_BADFILE = /home/lihaozhan/bulk/tb_asher_bad.txt
LOGFILE = /home/lihaozhan/bulk/tb_asher_output.log
LIMIT = INFINITE
PARSE_ERRORS = 0
CHECK_CONSTRAINTS = NO
TYPE = CSV
SKIP = 0
DELIMITER = |
QUOTE = "\""
ESCAPE = "\""
NULL = 
OUTPUT = public.tb_asher
MULTI_PROCESS = NO
VERBOSE = NO
WRITER = DIRECT
DUPLICATE_BADFILE = /home/lihaozhan/data/soft/unvdb-data/ud_bulkload/20230920175303_testdb_public_tb_asher.dup.csv
DUPLICATE_ERRORS = 0
ON_DUPLICATE_KEEP = NEW
TRUNCATE = NO


  0 Rows skipped.
  100000 Rows successfully loaded.
  0 Rows not loaded due to parse errors.
  0 Rows not loaded due to duplicate errors.
  0 Rows replaced with new rows.

Run began on 2023-09-20 17:53:03.296287+08
Run ended on 2023-09-20 17:53:03.394545+08

CPU 0.02s/0.05u sec elapsed 0.10 sec

先清空在加载

增加了 -o “TRUNCATE=YES” 参数

$ ud_bulkload -i /home/lihaozhan/bulk/bulk_asher.txt -O tb_asher -l /home/lihaozhan/bulk/tb_asher_output.log -P /home/lihaozhan/bulk/tb_asher_bad.txt -o "TYPE=CSV" -o "DELIMITER=|" -o "TRUNCATE=YES" -d testdb -U unvdb -h 127.0.0.1

NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
	0 Rows skipped.
	100000 Rows successfully loaded.
	0 Rows not loaded due to parse errors.
	0 Rows not loaded due to duplicate errors.
	0 Rows replaced with new rows.

数据查询

$ ud_sql -h 127.0.0.1 -d testdb -c "select count(1) from tb_asher;"

 count  
--------
 100000
(1 row)

使用控制文件

新建控制文件asher.ctl ,可以根据之前加载时,产生的日志文件tb_asher_output.log来更改,去掉里面没有值的参数 NULL=

vi asher.ctl 

INPUT = /home/lihaozhan/bulk/bulk_asher.txt
PARSE_BADFILE = /home/lihaozhan/bulk/tb_asher_bad.txt
LOGFILE = /home/lihaozhan/bulk/tb_asher_output.log
LIMIT = INFINITE
PARSE_ERRORS = 0
CHECK_CONSTRAINTS = NO
TYPE = CSV
SKIP = 0
DELIMITER = |
QUOTE = "\""
ESCAPE = "\""
OUTPUT = public.tb_asher
MULTI_PROCESS = NO
VERBOSE = NO
WRITER = DIRECT
DUPLICATE_BADFILE = /home/lihaozhan/data/soft/unvdb-data/ud_bulkload/20230921101752_testdb_public_tb_asher.dup.csv
DUPLICATE_ERRORS = 0
ON_DUPLICATE_KEEP = NEW
TRUNCATE = YES

使用控制文件来加载

$ ud_bulkload /home/lihaozhan/bulk/asher.ctl -d testdb -U unvdb -h 127.0.0.1

NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
	0 Rows skipped.
	100000 Rows successfully loaded.
	0 Rows not loaded due to parse errors.
	0 Rows not loaded due to duplicate errors.
	0 Rows replaced with new rows.

数据查询

$ ud_sql -h 127.0.0.1 -d testdb -c "select count(1) from tb_asher;"

 count  
--------
 100000
(1 row)

强制写wal日志

ud_bulkload 默认是跳过buffer直接写文件 ,但时如果有复制 ,或者需要基本wal日志恢复时没有wal日志是不行的,这是我们可以强制让其写wal日志 ,只需要加载 -o “WRITER=BUFFERED” 参数就可以了

$ ud_bulkload -i /home/lihaozhan/bulk/bulk_asher.txt -O tb_asher -l /home/lihaozhan/bulk/tb_asher_output.log -P /home/lihaozhan/bulk/tb_asher_bad.txt -o "TYPE=CSV" -o "DELIMITER=|" -o "TRUNCATE=YES" -o "WRITER=BUFFERED" -d testdb -U unvdb -h 127.0.0.1

NOTICE: BULK LOAD START
NOTICE: BULK LOAD END
	0 Rows skipped.
	100000 Rows successfully loaded.
	0 Rows not loaded due to parse errors.
	0 Rows not loaded due to duplicate errors.
	0 Rows replaced with new rows.

其他

如果您使用直接加载模式(WRITER=DIRECT 或 PARALLEL),您必须注意以下事项:

  1. PITR/Replication :由于绕过了 WAL,PITR 的归档恢复不可用。这并不意味着它可以在没有加载表数据的情况下完成 PITR。 如果您想使用 PITR,请在通过 ud_bulkload 加载后对数据库进行完整备份。如果您使用流式复制,则需要根据 ud_bulkload 之后的备份集重新创建备用数据库。

  2. 尽量不要使用 “ kill -9” 终止 ud_bulkload 命令。如果您这样做了,您必须调用 UnvDB 脚本来执行 ud_bulkload恢复并重新启动 UnvDB 以继续。

  3. 默认情况下,在数据加载期间仅强制执行唯一约束和非空约束。您可以设置“CHECK_CONSTRAINTS=YES”来检查 CHECK 约束。无法检查外键约束。用户有责任提供有效的数据集。

  4. maintenance_work_mem会影响 ud_bulkload的性能。如果将此参数从 64 MB 更改为 1 GB,则持续时间将缩短近 15%。