wal2json
配置 unvdbsvr.conf
在 unvdbsvr.conf 文件中,至少需要设置以下参数:
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
注意:更改这些参数后,需要重启数据库服务。
插件参数说明
wal2json 支持丰富的参数,常用参数如下(可在创建逻辑复制槽或订阅时指定):
| 参数名 | 说明 | 默认值 |
|---|---|---|
| include-xids | 每个变更集是否包含事务ID(xid) | false |
| include-timestamp | 每个变更集是否包含时间戳 | false |
| include-schemas | 每个变更是否包含schema信息 | true |
| include-types | 每个变更是否包含类型信息 | true |
| include-typmod | 类型是否包含修饰符(如varchar(20)) | true |
| include-type-oids | 是否包含类型OID | false |
| include-domain-data-type | 域类型是否替换为底层数据类型 | false |
| include-column-positions | 是否包含列位置(pg_attribute.attnum) | false |
| include-origin | 是否包含数据来源 | false |
| include-not-null | 是否包含非空信息 | false |
| include-default | 是否包含默认表达式 | false |
| include-pk | 是否包含主键信息(列名和数据类型) | false |
| numeric-data-types-as-string | 数字类型是否以字符串输出 | false |
| pretty-print | JSON结构是否美化输出(空格和缩进) | false |
| write-in-chunks | 是否每次变更后写入(仅format-version=1时有效) | false |
| include-lsn | 是否包含nextlsn | false |
| include-transaction | 是否输出每个事务的开始和结束记录 | true |
| filter-origins | 排除指定origin的数据(逗号分隔) | 空 |
| filter-tables | 排除指定表的数据(逗号分隔,需schema限定) | 空 |
| add-tables | 仅包含指定表的数据(与filter-tables规则相同) | 全部表 |
| filter-msg-prefixes | 排除指定前缀的消息(逗号分隔) | 空 |
| add-msg-prefixes | 仅包含指定前缀的消息(逗号分隔) | 全部前缀 |
| format-version | 输出格式版本(1或2) | 1 |
| actions | 指定输出哪些操作(insert, update, delete, truncate) | 全部 |
例子
有两种方法可以从 wal2json 插件获取更改(JSON 对象):通过 ud_recvlogical 或者 SQL 函数。
ud_recvlogical
除了上述配置外,还需要配置复制连接以使用ud_recvlogical。V17.4、17.5 和 17.6 中的逻辑复制连接需要在数据库列中输入关键字。从版本 18 开始,逻辑复制将普通条目与数据库名称或关键字(例如 .replication``all
首先,在 ud_hba.conf(17.4、17.5 和 17.6)中添加复制连接规则:
local replication myuser trust
如果您使用的是版本 18 或更高版本:
local mydatabase myuser trust
另外,在 unvdbsvr.conf 中设置max_wal_senders:
max_wal_senders = 1
如果您更改了max_wal_senders,则需要重新启动。
您已准备好尝试 wal2json。在一个终端中:
$ ud_recvlogical -d unvdb --slot test_slot --create-slot -P wal2json
$ ud_recvlogical -d unvdb --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
在另一个终端中:
CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
BEGIN;
INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table1_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table1_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
DROP TABLE table1_with_pk;
DROP TABLE table1_without_pk;
CREATE TABLE
CREATE TABLE
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78BFC828
3/78BFC880
DELETE 2
3/78BFC990
INSERT 0 1
UPDATE 1
COMMIT
DROP TABLE
DROP TABLE
第一个终端中的输出为:
{
"change": [
]
}
{
"change": [
]
}
{
"change": [
{
"kind": "message",
"transactional": false,
"prefix": "wal2json",
"content": "this non-transactional message will be delivered even if you rollback the transaction"
}
]
}
WARNING: table "table1_without_pk" without primary key or replica identity is nothing
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table1_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 11:58:28.988414"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table1_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 11:58:28.988414"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table1_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 11:58:28.988414"]
}
,{
"kind": "message",
"transactional": true,
"prefix": "wal2json",
"content": "this message will be delivered"
}
,{
"kind": "delete",
"schema": "public",
"table": "table1_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 11:58:28.988414"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table1_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 11:58:28.988414"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table1_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}
{
"change": [
]
}
{
"change": [
]
}
丢弃第一个终端中的插槽:
Ctrl+C
$ ud_recvlogical -d unvdb --slot test_slot --drop-slot
SQL functions
CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
BEGIN;
INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table2_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
DROP TABLE table2_with_pk;
DROP TABLE table2_without_pk;
上面的脚本生成以下输出:
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78C2CA50
3/78C2CAA8
DELETE 2
3/78C2CBD8
INSERT 0 1
UPDATE 1
COMMIT
{
"change": [
{
"kind": "message",
"transactional": false,
"prefix": "wal2json",
"content": "this non-transactional message will be delivered even if you rollback the transaction"
}
]
}
ud_sql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "message",
"transactional": true,
"prefix": "wal2json",
"content": "this message will be delivered"
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}
stop
DROP TABLE
DROP TABLE
让我们用 2 重复相同的示例:format-version
CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
BEGIN;
INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table3_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table3_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
DROP TABLE table3_with_pk;
DROP TABLE table3_without_pk;
上面的脚本生成以下输出:
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78CB8F30
3/78CB8F88
DELETE 2
3/78CB90B8
INSERT 0 1
UPDATE 1
COMMIT
ud_sql:/tmp/example3.sql:20: WARNING: no tuple identifier for UPDATE in table "public"."table3_without_pk"
{"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"}
{"action":"B"}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]}
{"action":"C"}
stop
DROP TABLE
DROP TABLE