wal2json

配置 unvdbsvr.conf

unvdbsvr.conf 文件中,至少需要设置以下参数:

wal_level = logical
max_replication_slots = 10
max_wal_senders = 10

注意:更改这些参数后,需要重启数据库服务。

插件参数说明

wal2json 支持丰富的参数,常用参数如下(可在创建逻辑复制槽或订阅时指定):

参数名 说明 默认值
include-xids 每个变更集是否包含事务ID(xid) false
include-timestamp 每个变更集是否包含时间戳 false
include-schemas 每个变更是否包含schema信息 true
include-types 每个变更是否包含类型信息 true
include-typmod 类型是否包含修饰符(如varchar(20)) true
include-type-oids 是否包含类型OID false
include-domain-data-type 域类型是否替换为底层数据类型 false
include-column-positions 是否包含列位置(pg_attribute.attnum) false
include-origin 是否包含数据来源 false
include-not-null 是否包含非空信息 false
include-default 是否包含默认表达式 false
include-pk 是否包含主键信息(列名和数据类型) false
numeric-data-types-as-string 数字类型是否以字符串输出 false
pretty-print JSON结构是否美化输出(空格和缩进) false
write-in-chunks 是否每次变更后写入(仅format-version=1时有效) false
include-lsn 是否包含nextlsn false
include-transaction 是否输出每个事务的开始和结束记录 true
filter-origins 排除指定origin的数据(逗号分隔)
filter-tables 排除指定表的数据(逗号分隔,需schema限定)
add-tables 仅包含指定表的数据(与filter-tables规则相同) 全部表
filter-msg-prefixes 排除指定前缀的消息(逗号分隔)
add-msg-prefixes 仅包含指定前缀的消息(逗号分隔) 全部前缀
format-version 输出格式版本(1或2) 1
actions 指定输出哪些操作(insert, update, delete, truncate) 全部

例子

有两种方法可以从 wal2json 插件获取更改(JSON 对象):通过 ud_recvlogical 或者 SQL 函数。

ud_recvlogical

除了上述配置外,还需要配置复制连接以使用ud_recvlogical。V17.4、17.5 和 17.6 中的逻辑复制连接需要在数据库列中输入关键字。从版本 18 开始,逻辑复制将普通条目与数据库名称或关键字(例如 .replication``all

首先,在 ud_hba.conf(17.4、17.5 和 17.6)中添加复制连接规则:

local    replication     myuser                     trust

如果您使用的是版本 18 或更高版本:

local    mydatabase      myuser                     trust

另外,在 unvdbsvr.conf 中设置max_wal_senders:

max_wal_senders = 1

如果您更改了max_wal_senders,则需要重新启动。

您已准备好尝试 wal2json。在一个终端中:

$ ud_recvlogical -d unvdb --slot test_slot --create-slot -P wal2json
$ ud_recvlogical -d unvdb --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -

在另一个终端中:

CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);

BEGIN;
INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table1_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table1_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');

INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;

DROP TABLE table1_with_pk;
DROP TABLE table1_without_pk;

CREATE TABLE
CREATE TABLE
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78BFC828
3/78BFC880
DELETE 2
3/78BFC990
INSERT 0 1
UPDATE 1
COMMIT
DROP TABLE
DROP TABLE

第一个终端中的输出为:

{
    "change": [
    ]
}
{
    "change": [
    ]
}
{
    "change": [
        {
            "kind": "message",
            "transactional": false,
            "prefix": "wal2json",
            "content": "this non-transactional message will be delivered even if you rollback the transaction"
        }
    ]
}
WARNING:  table "table1_without_pk" without primary key or replica identity is nothing
{
    "change": [
        {
            "kind": "insert",
            "schema": "public",
            "table": "table1_with_pk",
            "columnnames": ["a", "b", "c"],
            "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
            "columnvalues": [1, "Backup and Restore", "2018-03-27 11:58:28.988414"]
        }
        ,{
            "kind": "insert",
            "schema": "public",
            "table": "table1_with_pk",
            "columnnames": ["a", "b", "c"],
            "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
            "columnvalues": [2, "Tuning", "2018-03-27 11:58:28.988414"]
        }
        ,{
            "kind": "insert",
            "schema": "public",
            "table": "table1_with_pk",
            "columnnames": ["a", "b", "c"],
            "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
            "columnvalues": [3, "Replication", "2018-03-27 11:58:28.988414"]
        }
        ,{
            "kind": "message",
            "transactional": true,
            "prefix": "wal2json",
            "content": "this message will be delivered"
        }
        ,{
            "kind": "delete",
            "schema": "public",
            "table": "table1_with_pk",
            "oldkeys": {
                "keynames": ["a", "c"],
                "keytypes": ["integer", "timestamp without time zone"],
                "keyvalues": [1, "2018-03-27 11:58:28.988414"]
            }
        }
        ,{
            "kind": "delete",
            "schema": "public",
            "table": "table1_with_pk",
            "oldkeys": {
                "keynames": ["a", "c"],
                "keytypes": ["integer", "timestamp without time zone"],
                "keyvalues": [2, "2018-03-27 11:58:28.988414"]
            }
        }
        ,{
            "kind": "insert",
            "schema": "public",
            "table": "table1_without_pk",
            "columnnames": ["a", "b", "c"],
            "columntypes": ["integer", "numeric(5,2)", "text"],
            "columnvalues": [1, 2.34, "Tapir"]
        }
    ]
}
{
    "change": [
    ]
}
{
    "change": [
    ]
}

丢弃第一个终端中的插槽:

Ctrl+C
$ ud_recvlogical -d unvdb --slot test_slot --drop-slot

SQL functions

CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);

SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

BEGIN;
INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table2_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');

INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;

SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');

DROP TABLE table2_with_pk;
DROP TABLE table2_without_pk;

上面的脚本生成以下输出:

CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78C2CA50
3/78C2CAA8
DELETE 2
3/78C2CBD8
INSERT 0 1
UPDATE 1
COMMIT
{
    "change": [
        {
            "kind": "message",
            "transactional": false,
            "prefix": "wal2json",
            "content": "this non-transactional message will be delivered even if you rollback the transaction"
        }
    ]
}
ud_sql:/tmp/example2.sql:17: WARNING:  table "table2_without_pk" without primary key or replica identity is nothing
{
    "change": [
        {
            "kind": "insert",
            "schema": "public",
            "table": "table2_with_pk",
            "columnnames": ["a", "b", "c"],
            "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
            "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
        }
        ,{
            "kind": "insert",
            "schema": "public",
            "table": "table2_with_pk",
            "columnnames": ["a", "b", "c"],
            "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
            "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
        }
        ,{
            "kind": "insert",
            "schema": "public",
            "table": "table2_with_pk",
            "columnnames": ["a", "b", "c"],
            "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
            "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
        }
        ,{
            "kind": "message",
            "transactional": true,
            "prefix": "wal2json",
            "content": "this message will be delivered"
        }
        ,{
            "kind": "delete",
            "schema": "public",
            "table": "table2_with_pk",
            "oldkeys": {
                "keynames": ["a", "c"],
                "keytypes": ["integer", "timestamp without time zone"],
                "keyvalues": [1, "2018-03-27 12:05:29.914496"]
            }
        }
        ,{
            "kind": "delete",
            "schema": "public",
            "table": "table2_with_pk",
            "oldkeys": {
                "keynames": ["a", "c"],
                "keytypes": ["integer", "timestamp without time zone"],
                "keyvalues": [2, "2018-03-27 12:05:29.914496"]
            }
        }
        ,{
            "kind": "insert",
            "schema": "public",
            "table": "table2_without_pk",
            "columnnames": ["a", "b", "c"],
            "columntypes": ["integer", "numeric(5,2)", "text"],
            "columnvalues": [1, 2.34, "Tapir"]
        }
    ]
}
stop
DROP TABLE
DROP TABLE

让我们用 2 重复相同的示例:format-version


CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);

SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

BEGIN;
INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table3_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table3_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');

INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;

SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');

DROP TABLE table3_with_pk;
DROP TABLE table3_without_pk;

上面的脚本生成以下输出:


CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78CB8F30
3/78CB8F88
DELETE 2
3/78CB90B8
INSERT 0 1
UPDATE 1
COMMIT
ud_sql:/tmp/example3.sql:20: WARNING:  no tuple identifier for UPDATE in table "public"."table3_without_pk"
{"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"}
{"action":"B"}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]}
{"action":"C"}
stop
DROP TABLE
DROP TABLE