逻辑发布订阅
物理复制槽通常用于主从同步,逻辑复制槽可以指定表,指定操作进行发布,在订阅端对这些表和操作进行重放,实现按需数据同步。
本节以实操的形式讲解逻辑发布和订阅相关内容,其中涉及以下关键知识点
主,从,级联从
物理复制槽
逻辑复制槽
主节点1发布
主节点2订阅
环境架构
主节点A:192.168.29.51:5001 –> 从节点A1:192.168.29.51:5002 –> 从节点A2(发布端):192.168.29.51:5003 –> 主节点B(订阅端):192.168.29.51:6001
注意:24版本(含)以上支持在从节点进行发布。其它版本只能在主节点发布
实验目标
A节点的
部分
表通过级联节点A2
同步到B节点
配置步骤
安装A节点
[unvdb@local-dev51 soft]$ pwd
/data/aiops2/data/24/soft
[unvdb@local-dev51 soft]$ ./setup.sh
===============================================================================
Choose Data Folder
------------------
Please choose a data folder for this installation.
The data folder must be empty and writable.
Default Data Folder: /data/aiops2/data/24/unvdb-data
Enter the absolute path,
or press <ENTER> key to accept the default,
: /data/aiops2/data/24/5001
===============================================================================
Port
----
Please enter UnvDB listen port.
Port (Default: 5678): 5001
===============================================================================
修改相关参数
[unvdb@local-dev51 24]$ pwd
/data/aiops2/data/24
[unvdb@local-dev51 24]$ cat >> 5001/unvdbsvr.conf <<EOF
port = 5001
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
hot_standby = on
EOF
启动数据库
[unvdb@local-dev51 24]$ ./soft/bin/ud_ctl start -D 5001 -l 5001.log
starting UnvDB as a demo version
waiting for server to start.... done
server started
生成模拟数据
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001
unvdb=# create database d1;
CREATE DATABASE
unvdb=# \c d1
You are now connected to database "d1" as user "unvdb".
d1=# create table t1 (id int); --t1表无主键
CREATE TABLE
d1=# insert into t1 (id) values(1);
INSERT 0 1
d1=# create table t2 (id SERIAL PRIMARY KEY,name text);
t * from t1,t2;CREATE TABLE
d1=# insert into t2 (name) values ('a');
INSERT 0 1
d1=# select * from t1,t2;
id | id | name
----+----+------
1 | 1 | a
(1 row)
配置A1从节点
[unvdb@local-dev51 24]$ ./soft/bin/ud_basebackup -p 5001 -D 5002 -Fp -Xs -P -R -S slot2 -C --checkpoint=fast
30685/30685 kB (100%), 1/1 tablespace
[unvdb@local-dev51 24]$ ls
5001 5001.log 5002 soft
[unvdb@local-dev51 24]$ cat >> 5002/unvdbsvr.conf <<EOF
port = 5002
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
hot_standby = on
EOF
[unvdb@local-dev51 24]$ ./soft/bin/ud_ctl start -D 5002 -l 5002.log
配置A2从节点
[unvdb@local-dev51 24]$ ./soft/bin/ud_basebackup -p 5002 -D 5003 -Fp -Xs -P -R -S slot3 -C --checkpoint=fast
[unvdb@local-dev51 24]$ ls
5001 5001.log 5002 5002.log 5003
[unvdb@local-dev51 24]$ cat >> 5003/unvdbsvr.conf <<EOF
port = 5003
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
hot_standby = on
EOF
vi 5003/unvdbsvr.auto.conf
primary_conninfo = 'user=unvdb passfile=''/data/aiops2/data/appdb/.pgpass'' channel_binding=prefer port=5002 sslmode=prefer sslcompression=0 sslcertmode=allow sslsni=1 ssl_min_protocol_version=TLSv1.2 gssencmode=disable krbsrvname=unvdbsvr gssdelegation=0 target_session_attrs=any load_balance_hosts=disable' #此处上级节点是5002
primary_slot_name = 'slot3'
[unvdb@local-dev51 24]$ ./soft/bin/ud_ctl start -D 5003 -l 5003.log
验证物理复制状态
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from pg_replication_slots';
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting
-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------
slot2 | | physical | | | f | t | 49839 | | | 0/3000348 | | reserved | | f |
(1 row)
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5002 d1 -c'select * from pg_replication_slots';
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting
-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------
slot3 | | physical | | | f | t | 50438 | | | 0/3000348 | | reserved | | f |
(1 row)
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5003 d1 -c'select * from pg_replication_slots';
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting
-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------
(0 rows)
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c"insert into t2(name) values ('b')";
INSERT 0 1
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from t2';
id | name
----+------
1 | a
2 | b
(2 rows)
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5002 d1 -c'select * from t2';
id | name
----+------
1 | a
2 | b
(2 rows)
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5003 d1 -c'select * from t2';
id | name
----+------
1 | a
2 | b
(2 rows)
安装B节点
直接备份A节点
[unvdb@local-dev51 24]$ ./soft/bin/ud_basebackup -p 5001 -D 6001 -Fp -Xs -P --checkpoint=fast
[unvdb@local-dev51 24]$ cat >> 6001/unvdbsvr.conf <<EOF
port = 6001
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
hot_standby = on
EOF
[unvdb@local-dev51 24]$ ./soft/bin/ud_ctl start -D 6001 -l 6001.log
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'select * from t2;'
id | name
----+------
1 | a
2 | b
(2 rows)
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'create table t3(id int);'
CREATE TABLE
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'insert into t3(id) values(1);'
INSERT 0 1
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'select * from t3;'
id
----
(0 rows)
在A节点创建发布
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1
d1=# CREATE PUBLICATION pub_name FOR TABLE t1,t2 WITH (publish = 'insert, update, delete'); --创建pub_name发布,只发布t1,t2表的插入/修改/删除 操作
CREATE PUBLICATION
d1=# SELECT * FROM pg_publication; --查看所有发布信息
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-------+----------+----------+--------------+-----------+-----------+-----------+-------------+------------
16401 | pub_name | 10 | f | t | t | t | f | f
(1 row)
在B节点创建订阅
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1
ud_sql (24.2)
Type "help" for help.
d1=# CREATE SUBSCRIPTION sub_incremental CONNECTION 'host=127.0.0.1 port=5003 user=unvdb password=1 dbname=d1'PUBLICATION pub_name WITH ( slot_name = 'slot6001',create_slot = true, copy_data = false);--创建sub_incremental订阅,指定发布节点信息和发布名称,自动创建逻辑槽,不复制现有数据
NOTICE: created replication slot "slot6001" on publisher
CREATE SUBSCRIPTION
此时会卡往,需要在A主节点执行快照以保证数据一致。
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001
unvdb=# SELECT pg_log_standby_snapshot();
pg_log_standby_snapshot
-------------------------
0/50040C8
(1 row)
在B节点查询订阅信息
d1=# SELECT * FROM pg_subscription;
oid | subdbid | subskiplsn | subname | subowner | subenabled | subbinary | substream | subtwophasestate | subdisableonerr | subpasswordrequired | subrunasowner | subc
onninfo | subslotname | subsynccommit | subpublications | suborigin
-------+---------+------------+-----------------+----------+------------+-----------+-----------+------------------+-----------------+---------------------+---------------+---------------------------
-------------------------------+-------------+---------------+-----------------+-----------
24579 | 16384 | 0/0 | sub_incremental | 10 | t | f | f | d | f | t | f | host=127.0.0.1 port=5003 u
ser=unvdb password=1 dbname=d1 | slot6001 | off | {pub_name} | any
(1 row)
此时在A2从节点已自动创建复制槽 slot6001
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5003 d1 -c'SELECT * FROM pg_replication_slots;'
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflictin
g
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-----------
--
slot6001 | pgoutput | logical | 16384 | d1 | f | t | 52656 | | 749 | 0/5004178 | 0/50041B0 | reserved | | f | f
(1 row)
验证数据同步
A节点现有数据
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from t1;select * from t2;'
id
----
1
(1 row)
id | name
----+------
1 | a
2 | b
(2 rows)
A节点插入数据
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c"insert into t1(id) values(2);insert into t2(name) values('c');"
INSERT 0 1
INSERT 0 1
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from t1;select * from t2;'
id
----
1
2
(2 rows)
id | name
----+------
1 | a
2 | b
3 | c
(3 rows)
B节点数据,已经从A节点同步过来
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'select * from t1;select * from t2;'
id
----
1
2
(2 rows)
id | name
----+------
1 | a
2 | b
3 | c
(3 rows)
删除数据验证
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'delete from t2 where id=1'
DELETE 1
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from t2;'
id | name
----+------
2 | b
3 | c
4 | d
(3 rows)
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'select * from t2;'
id | name
----+------
2 | b
3 | c
4 | d
(3 rows)
删除无主键的表
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'delete from t1 where id=1'
ERROR: cannot delete from table "t1" because it does not have a replica identity and publishes deletes
HINT: To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.
此时无法删除无主键的表,
需要添加主键ALTER TABLE t1 ADD PRIMARY KEY (id);
或设置表副本标识ALTER TABLE t1 REPLICA IDENTITY FULL;
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'ALTER TABLE t1 REPLICA IDENTITY FULL;'
ALTER TABLE
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'delete from t1 where id=1'
DELETE 1
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from t1;'
id
----
2
3
(2 rows)
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'select * from t1;'
id
----
2
3
(2 rows)
增加t3表发布
发布端
d1=# create table t4 (id SERIAL PRIMARY KEY,name text);
CREATE TABLE
d1=# insert into t4(name) values('a');
INSERT 0 1
ALTER PUBLICATION pub_name ADD table t4;
订阅端
d1=# create table t4 (id SERIAL PRIMARY KEY,name text);--创建相同的表结构
CREATE TABLE
ALTER SUBSCRIPTION sub_incremental REFRESH PUBLICATION;--刷新订阅
ALTER SUBSCRIPTION sub_incremental DISABLE;--停止订阅
ALTER SUBSCRIPTION sub_incremental ENABLE;--启动订阅
d1=# select * from t4;
模拟同步中断和解决
订阅端手动插入一条数据id=4,模拟id冲突
d1=# select * from t4;
id | name
----+------
1 | a
2 | b
3 | b
d1=# insert into t4 (id,name) values(4,'A');
INSERT 0 1
d1=# select * from t4;
id | name
----+------
1 | a
2 | b
3 | b
4 | A
(4 rows)
发布端也插入id=4的数据
d1=# insert into t4 (name) values('A0');
INSERT 0 1
d1=# select * from t4;
id | name
----+------
1 | a
2 | b
3 | b
4 | A0
(4 rows)
此时同步中断,在订阅端有日志
2024-07-18 14:27:11.856 CST [60946] LOG: logical replication apply worker for subscription "sub_incremental" has started
2024-07-18 14:27:11.873 CST [60946] ERROR: duplicate key value violates unique constraint "t4_pkey"
2024-07-18 14:27:11.873 CST [60946] DETAIL: Key (id)=(4) already exists.
2024-07-18 14:27:11.873 CST [60946] CONTEXT: processing remote data for replication origin "pg_24579" during message type "INSERT" for replication target relation "public.t4" in transaction 764, finished at 0/50576B8
2024-07-18 14:27:11.875 CST [51568] LOG: background worker "logical replication worker" (PID 60946) exited with exit code 1
订阅端日志
2024-07-18 14:28:01.909 CST [61226] LOG: 0/5057390 has been already streamed, forwarding to 0/5057518
2024-07-18 14:28:01.909 CST [61226] STATEMENT: START_REPLICATION SLOT "slot6001" LOGICAL 0/5057390 (proto_version '4', origin 'any', publication_names '"pub_name"')
2024-07-18 14:28:01.911 CST [61226] LOG: starting logical decoding for slot "slot6001"
2024-07-18 14:28:01.911 CST [61226] DETAIL: Streaming transactions committing after 0/5057518, reading WAL from 0/5057478.
2024-07-18 14:28:01.911 CST [61226] STATEMENT: START_REPLICATION SLOT "slot6001" LOGICAL 0/5057390 (proto_version '4', origin 'any', publication_names '"pub_name"')
2024-07-18 14:28:01.912 CST [61226] LOG: logical decoding found consistent point at 0/5057478
2024-07-18 14:28:01.912 CST [61226] DETAIL: There are no running transactions.
2024-07-18 14:28:01.912 CST [61226] STATEMENT: START_REPLICATION SLOT "slot6001" LOGICAL 0/5057390 (proto_version '4', origin 'any', publication_names '"pub_name"')
d1=# SELECT * FROM pg_stat_subscription_stats;--查看订阅状态
subid | subname | apply_error_count | sync_error_count | stats_reset
-------+-----------------+-------------------+------------------+-------------
24579 | sub_incremental | 56 | 0 |
(1 row)
手动跳过事务
SELECT * FROM pg_subscription;--查询订阅信息
SELECT pg_replication_origin_advance('pg_24579', '0/50576B9'::pg_lsn); --跳过指定lsn。pg_24579是订阅的oid,lsn是报错日志`finished at 0/50576B8 +1`
注意:此时同步会继续,但数据可能不一致,需要手动修复。
注意事项
通常发布端和订阅端都允许写入,可能导致数据不一致
发布订阅不保证复制延时
其它操作
发布相关命令
d1=# \h alter PUBLICATION
Command: ALTER PUBLICATION
Description: change the definition of a publication
Syntax:
ALTER PUBLICATION name ADD publication_object [, ...]
ALTER PUBLICATION name SET publication_object [, ...]
ALTER PUBLICATION name DROP publication_object [, ...]
ALTER PUBLICATION name SET ( publication_parameter [= value] [, ... ] )
ALTER PUBLICATION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER PUBLICATION name RENAME TO new_name
where publication_object is one of:
TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ]
TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ]
d1=# \h drop publication
Command: DROP PUBLICATION
Description: remove a publication
Syntax:
DROP PUBLICATION [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]
CREATE PUBLICATION pub_name FOR ALL TABLES WITH (publish = ‘insert, update, delete,truncate’); –创建发布所有表的特定操作
SELECT * FROM pg_publication_tables; –查看发布的表
订阅相关命令
d1=# \h ALTER SUBSCRIPTION
Command: ALTER SUBSCRIPTION
Description: change the definition of a subscription
Syntax:
ALTER SUBSCRIPTION name CONNECTION 'conninfo'
ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name ADD PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name DROP PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name ENABLE
ALTER SUBSCRIPTION name DISABLE
ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] )
ALTER SUBSCRIPTION name SKIP ( skip_option = value )
ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER SUBSCRIPTION name RENAME TO new_name
d1=# \h drop SUBSCRIPTION
Command: DROP SUBSCRIPTION
Description: remove a subscription
Syntax:
DROP SUBSCRIPTION [ IF EXISTS ] name [ CASCADE | RESTRICT ]
CREATE SUBSCRIPTION sub_incremental CONNECTION ‘host=127.0.0.1 port=5003 user=unvdb password=1 dbname=d1’PUBLICATION pub_name WITH ( slot_name = ‘slot_standby’,create_slot = true, copy_data = false,enabled = false,origin = none, filter = ‘WHERE id > 100’); –创建订阅
参数 | 类型 | 描述 | 默认值 |
---|---|---|---|
CONNECTION | 字符串 | 到发布者的连接字符串,格式为 libpq 连接 URI | - |
PUBLICATION 文本 | 要订阅的发布名称列表 | - | |
copy_data | 布尔 | 是否在订阅开始时复制现有数据 | true |
create_slot | 布尔 | 是否在发布者上创建复制槽 | true |
enabled | 布尔 | 是否立即启用订阅 | true |
slot_name | 文本 | 要使用的复制槽名称 | 订阅名称 |
synchronous_commit | 枚举 | 设置订阅者的同步提交模式 | off |
connect | 布尔 | 是否立即连接到发布者 | true |
binary | 布尔 | 是否使用二进制格式传输数据 | false |
streaming | 枚举 | 流式复制的模式 (off/on/parallel) | off |
two_phase | 布尔 | 是否启用两阶段提交 | false |
disable_on_error | 布尔 | 出错时是否自动禁用订阅 | false |
run_as_owner | 布尔 | 是否以订阅所有者的权限运行 | true |
password_required | 布尔 | 是否要求连接提供密码 | true |
SELECT * FROM pg_stat_subscription_stats;–查看订阅状态统计
pg_stat_subscription_stats 用于提供关于逻辑复制订阅的统计信息,帮助监控订阅的健康状况和性能。
| 列名| 类型| 描述|
| - | - | - |
|subid| oid| 订阅的OID|
|subname| name| 订阅名称|
|apply_error_count| bigint| 应用错误次数|
|sync_error_count| bigint| 同步错误次数|
|stats_reset| timestamp with time zone| 统计信息最后一次重置的时间|
SELECT * FROM pg_replication_origin_status;
pg_replication_origin_status 是 用于跟踪逻辑复制源状态的重要系统视图,它显示了关于复制源(replication origins)的状态信息。
列名 | 类型 | 描述 |
---|---|---|
local_id | oid | 本地复制源的标识符 |
external_id | text | 外部复制源的名称 |
remote_lsn | pg_lsn | 最后接收到的远程位置的LSN |
local_lsn | pg_lsn | 最后应用的本地位置的LSN |
SELECT * FROM pg_stat_subscription;–查看订阅状态
SELECT * FROM pg_subscription;–查看订阅信息
SELECT * FROM pg_subscription_rel;–查看订阅任务
pg_subscription_rel 是用于跟踪逻辑复制订阅关系的系统表,它记录了每个订阅中各个表的状态信息。
列名 | 类型 | 描述 |
---|---|---|
srsubid | oid | 订阅的 OID (引用 pg_subscription.oid) |
srrelid | oid | 表的 OID (引用 pg_class.oid) |
srsubstate | char | 表的状态代码.i 初始化状态,d 数据正在复制,s 同步完成 ,r 准备好 (可以接收更改) ,f 复制失败 |
srsublsn | pg_lsn | 最后一个同步的 LSN (日志序列号)位置 |
ALTER SUBSCRIPTION sub_name DISABLE;– 暂停订阅 ALTER SUBSCRIPTION sub_name ENABLE;– 启用订阅 ALTER SUBSCRIPTION sub_name ADD PUBLICATION another_pub;– 添加额外的发布到现有订阅
SELECT pg_replication_origin_advance(’pg_24579’, ‘0/50576B9’::pg_lsn);–设置复制槽位置
pg_replication_origin_advance 用于管理逻辑复制进度的重要函数,它允许手动设置复制源的进度位置。
函数说明
sql
pg_replication_origin_advance(origin_name text, lsn pg_lsn) RETURNS void
参数
origin_name:复制源的名称(最大长度63字节)
lsn:要设置的新日志序列号(LSN)位置