逻辑发布订阅

物理复制槽通常用于主从同步,逻辑复制槽可以指定表,指定操作进行发布,在订阅端对这些表和操作进行重放,实现按需数据同步。
本节以实操的形式讲解逻辑发布和订阅相关内容,其中涉及以下关键知识点

  • 主,从,级联从

  • 物理复制槽

  • 逻辑复制槽

  • 主节点1发布

  • 主节点2订阅

环境架构

主节点A:192.168.29.51:5001 –> 从节点A1:192.168.29.51:5002 –> 从节点A2(发布端):192.168.29.51:5003 –> 主节点B(订阅端):192.168.29.51:6001

  • 注意:24版本(含)以上支持在从节点进行发布。其它版本只能在主节点发布

实验目标

  • A节点的 部分通过级联节点A2 同步到 B节点

配置步骤

安装A节点

[unvdb@local-dev51 soft]$ pwd
/data/aiops2/data/24/soft
[unvdb@local-dev51 soft]$ ./setup.sh
===============================================================================
Choose Data Folder
------------------

Please choose a data folder for this installation.
The data folder must be empty and writable.

Default Data Folder: /data/aiops2/data/24/unvdb-data

Enter the absolute path, 
or press <ENTER> key to accept the default, 

    : /data/aiops2/data/24/5001


===============================================================================
Port
----

Please enter UnvDB listen port.

Port (Default: 5678): 5001


===============================================================================

修改相关参数

[unvdb@local-dev51 24]$ pwd
/data/aiops2/data/24

[unvdb@local-dev51 24]$ cat >> 5001/unvdbsvr.conf <<EOF
port = 5001
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
hot_standby = on
EOF

启动数据库

[unvdb@local-dev51 24]$ ./soft/bin/ud_ctl start -D 5001 -l 5001.log
starting UnvDB as a demo version
waiting for server to start.... done
server started

生成模拟数据

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001
unvdb=# create database d1;
CREATE DATABASE
unvdb=# \c d1
You are now connected to database "d1" as user "unvdb".
d1=# create table t1 (id int); --t1表无主键
CREATE TABLE
d1=# insert into t1 (id) values(1);
INSERT 0 1
d1=# create table t2 (id SERIAL PRIMARY KEY,name text);
t * from t1,t2;CREATE TABLE
d1=# insert into t2 (name) values ('a');
INSERT 0 1
d1=# select * from t1,t2;
 id | id | name 
----+----+------
  1 |  1 | a
(1 row)

配置A1从节点

[unvdb@local-dev51 24]$ ./soft/bin/ud_basebackup -p 5001 -D 5002 -Fp -Xs -P -R -S slot2 -C --checkpoint=fast
30685/30685 kB (100%), 1/1 tablespace
[unvdb@local-dev51 24]$ ls
5001  5001.log  5002  soft
[unvdb@local-dev51 24]$ cat >> 5002/unvdbsvr.conf <<EOF
port = 5002
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
hot_standby = on
EOF
[unvdb@local-dev51 24]$ ./soft/bin/ud_ctl start -D 5002 -l 5002.log

配置A2从节点

[unvdb@local-dev51 24]$ ./soft/bin/ud_basebackup -p 5002 -D 5003 -Fp -Xs -P -R -S slot3 -C --checkpoint=fast
[unvdb@local-dev51 24]$ ls
5001  5001.log  5002  5002.log  5003
[unvdb@local-dev51 24]$ cat >> 5003/unvdbsvr.conf <<EOF
port = 5003
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
hot_standby = on
EOF
vi 5003/unvdbsvr.auto.conf  
primary_conninfo = 'user=unvdb passfile=''/data/aiops2/data/appdb/.pgpass'' channel_binding=prefer port=5002 sslmode=prefer sslcompression=0 sslcertmode=allow sslsni=1 ssl_min_protocol_version=TLSv1.2 gssencmode=disable krbsrvname=unvdbsvr gssdelegation=0 target_session_attrs=any load_balance_hosts=disable'  #此处上级节点是5002
primary_slot_name = 'slot3'
[unvdb@local-dev51 24]$ ./soft/bin/ud_ctl start -D 5003 -l 5003.log

验证物理复制状态

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from pg_replication_slots';
 slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting 
-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------
 slot2     |        | physical  |        |          | f         | t      |      49839 |      |              | 0/3000348   |                     | reserved   |               | f         | 
(1 row)

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5002 d1 -c'select * from pg_replication_slots';
 slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting 
-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------
 slot3     |        | physical  |        |          | f         | t      |      50438 |      |              | 0/3000348   |                     | reserved   |               | f         | 
(1 row)

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5003 d1 -c'select * from pg_replication_slots';
 slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting 
-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------
(0 rows)
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c"insert into t2(name) values ('b')";
INSERT 0 1
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from t2';
 id | name 
----+------
  1 | a
  2 | b
(2 rows)

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5002 d1 -c'select * from t2';
 id | name 
----+------
  1 | a
  2 | b
(2 rows)

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5003 d1 -c'select * from t2';
 id | name 
----+------
  1 | a
  2 | b
(2 rows)

安装B节点

直接备份A节点

[unvdb@local-dev51 24]$ ./soft/bin/ud_basebackup -p 5001 -D 6001 -Fp -Xs -P --checkpoint=fast
[unvdb@local-dev51 24]$ cat >> 6001/unvdbsvr.conf <<EOF
port = 6001
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
hot_standby = on
EOF
[unvdb@local-dev51 24]$ ./soft/bin/ud_ctl start -D 6001 -l 6001.log
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'select * from t2;'
 id | name 
----+------
  1 | a
  2 | b
(2 rows)

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'create table t3(id int);'
CREATE TABLE
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'insert into t3(id) values(1);'
INSERT 0 1
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'select * from t3;'
 id 
----
(0 rows)

在A节点创建发布

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 

d1=# CREATE PUBLICATION pub_name FOR TABLE t1,t2 WITH (publish = 'insert, update, delete'); --创建pub_name发布,只发布t1,t2表的插入/修改/删除 操作
CREATE PUBLICATION

d1=# SELECT * FROM pg_publication; --查看所有发布信息
  oid  | pubname  | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot 
-------+----------+----------+--------------+-----------+-----------+-----------+-------------+------------
 16401 | pub_name |       10 | f            | t         | t         | t         | f           | f
(1 row)

在B节点创建订阅

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 
ud_sql (24.2)
Type "help" for help.

d1=# CREATE SUBSCRIPTION sub_incremental CONNECTION 'host=127.0.0.1 port=5003 user=unvdb password=1 dbname=d1'PUBLICATION pub_name WITH ( slot_name = 'slot6001',create_slot = true, copy_data = false);--创建sub_incremental订阅,指定发布节点信息和发布名称,自动创建逻辑槽,不复制现有数据
NOTICE:  created replication slot "slot6001" on publisher
CREATE SUBSCRIPTION

此时会卡往,需要在A主节点执行快照以保证数据一致。

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001
unvdb=# SELECT pg_log_standby_snapshot();
 pg_log_standby_snapshot 
-------------------------
 0/50040C8
(1 row)

在B节点查询订阅信息

d1=# SELECT * FROM pg_subscription;
  oid  | subdbid | subskiplsn |     subname     | subowner | subenabled | subbinary | substream | subtwophasestate | subdisableonerr | subpasswordrequired | subrunasowner |                       subc
onninfo                        | subslotname | subsynccommit | subpublications | suborigin 
-------+---------+------------+-----------------+----------+------------+-----------+-----------+------------------+-----------------+---------------------+---------------+---------------------------
-------------------------------+-------------+---------------+-----------------+-----------
 24579 |   16384 | 0/0        | sub_incremental |       10 | t          | f         | f         | d                | f               | t                   | f             | host=127.0.0.1 port=5003 u
ser=unvdb password=1 dbname=d1 | slot6001    | off           | {pub_name}      | any
(1 row)

此时在A2从节点已自动创建复制槽 slot6001

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5003 d1 -c'SELECT * FROM pg_replication_slots;'
 slot_name |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflictin
g 
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-----------
--
 slot6001  | pgoutput | logical   |  16384 | d1       | f         | t      |      52656 |      |          749 | 0/5004178   | 0/50041B0           | reserved   |               | f         | f
(1 row)

验证数据同步

A节点现有数据

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from t1;select * from t2;'
 id 
----
  1
(1 row)

 id | name 
----+------
  1 | a
  2 | b
(2 rows)

A节点插入数据

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c"insert into t1(id) values(2);insert into t2(name) values('c');"
INSERT 0 1
INSERT 0 1
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from t1;select * from t2;'
 id 
----
  1
  2
(2 rows)

 id | name 
----+------
  1 | a
  2 | b
  3 | c
(3 rows)

B节点数据,已经从A节点同步过来

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'select * from t1;select * from t2;'
 id 
----
  1
  2
(2 rows)

 id | name 
----+------
  1 | a
  2 | b
  3 | c
(3 rows)

删除数据验证

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'delete from t2 where id=1'
DELETE 1
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from t2;'
 id | name 
----+------
  2 | b
  3 | c
  4 | d
(3 rows)

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'select * from t2;'
 id | name 
----+------
  2 | b
  3 | c
  4 | d
(3 rows)

删除无主键的表

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'delete from t1 where id=1'
ERROR:  cannot delete from table "t1" because it does not have a replica identity and publishes deletes
HINT:  To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.

此时无法删除无主键的表,
需要添加主键ALTER TABLE t1 ADD PRIMARY KEY (id); 或设置表副本标识ALTER TABLE t1 REPLICA IDENTITY FULL;

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'ALTER TABLE t1 REPLICA IDENTITY FULL;'
ALTER TABLE
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'delete from t1 where id=1'
DELETE 1
[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 5001 d1 -c'select * from t1;'
 id 
----
  2
  3
(2 rows)

[unvdb@local-dev51 24]$ ./soft/bin/ud_sql -p 6001 d1 -c'select * from t1;'
 id 
----
  2
  3
(2 rows)

增加t3表发布

发布端

d1=# create table t4 (id SERIAL  PRIMARY KEY,name text);
CREATE TABLE
d1=# insert into t4(name) values('a');
INSERT 0 1

ALTER PUBLICATION  pub_name  ADD table t4;

订阅端

d1=# create table t4 (id SERIAL  PRIMARY KEY,name text);--创建相同的表结构
CREATE TABLE
ALTER SUBSCRIPTION sub_incremental REFRESH PUBLICATION;--刷新订阅
ALTER SUBSCRIPTION sub_incremental DISABLE;--停止订阅
ALTER SUBSCRIPTION sub_incremental ENABLE;--启动订阅
d1=# select * from t4;

模拟同步中断和解决

订阅端手动插入一条数据id=4,模拟id冲突

d1=# select * from t4;
 id | name 
----+------
  1 | a
  2 | b
  3 | b

d1=# insert into t4 (id,name) values(4,'A');
INSERT 0 1
d1=# select * from t4;
 id | name 
----+------
  1 | a
  2 | b
  3 | b
  4 | A
(4 rows)

发布端也插入id=4的数据

d1=# insert into t4 (name) values('A0');
INSERT 0 1
d1=# select * from t4;
 id | name 
----+------
  1 | a
  2 | b
  3 | b
  4 | A0
(4 rows)

此时同步中断,在订阅端有日志

2024-07-18 14:27:11.856 CST [60946] LOG:  logical replication apply worker for subscription "sub_incremental" has started
2024-07-18 14:27:11.873 CST [60946] ERROR:  duplicate key value violates unique constraint "t4_pkey"
2024-07-18 14:27:11.873 CST [60946] DETAIL:  Key (id)=(4) already exists.
2024-07-18 14:27:11.873 CST [60946] CONTEXT:  processing remote data for replication origin "pg_24579" during message type "INSERT" for replication target relation "public.t4" in transaction 764, finished at 0/50576B8
2024-07-18 14:27:11.875 CST [51568] LOG:  background worker "logical replication worker" (PID 60946) exited with exit code 1

订阅端日志

2024-07-18 14:28:01.909 CST [61226] LOG:  0/5057390 has been already streamed, forwarding to 0/5057518
2024-07-18 14:28:01.909 CST [61226] STATEMENT:  START_REPLICATION SLOT "slot6001" LOGICAL 0/5057390 (proto_version '4', origin 'any', publication_names '"pub_name"')
2024-07-18 14:28:01.911 CST [61226] LOG:  starting logical decoding for slot "slot6001"
2024-07-18 14:28:01.911 CST [61226] DETAIL:  Streaming transactions committing after 0/5057518, reading WAL from 0/5057478.
2024-07-18 14:28:01.911 CST [61226] STATEMENT:  START_REPLICATION SLOT "slot6001" LOGICAL 0/5057390 (proto_version '4', origin 'any', publication_names '"pub_name"')
2024-07-18 14:28:01.912 CST [61226] LOG:  logical decoding found consistent point at 0/5057478
2024-07-18 14:28:01.912 CST [61226] DETAIL:  There are no running transactions.
2024-07-18 14:28:01.912 CST [61226] STATEMENT:  START_REPLICATION SLOT "slot6001" LOGICAL 0/5057390 (proto_version '4', origin 'any', publication_names '"pub_name"')
d1=# SELECT * FROM pg_stat_subscription_stats;--查看订阅状态
 subid |     subname     | apply_error_count | sync_error_count | stats_reset 
-------+-----------------+-------------------+------------------+-------------
 24579 | sub_incremental |                56 |                0 | 
(1 row)

手动跳过事务

SELECT * FROM pg_subscription;--查询订阅信息
SELECT pg_replication_origin_advance('pg_24579', '0/50576B9'::pg_lsn); --跳过指定lsn。pg_24579是订阅的oid,lsn是报错日志`finished at 0/50576B8 +1`
  • 注意:此时同步会继续,但数据可能不一致,需要手动修复。

注意事项

  • 通常发布端和订阅端都允许写入,可能导致数据不一致

  • 发布订阅不保证复制延时

其它操作

发布相关命令

d1=# \h alter PUBLICATION
Command:     ALTER PUBLICATION
Description: change the definition of a publication
Syntax:
ALTER PUBLICATION name ADD publication_object [, ...]
ALTER PUBLICATION name SET publication_object [, ...]
ALTER PUBLICATION name DROP publication_object [, ...]
ALTER PUBLICATION name SET ( publication_parameter [= value] [, ... ] )
ALTER PUBLICATION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER PUBLICATION name RENAME TO new_name

where publication_object is one of:

    TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ]
    TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ]

d1=# \h drop publication 
Command:     DROP PUBLICATION
Description: remove a publication
Syntax:
DROP PUBLICATION [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]    

CREATE PUBLICATION pub_name FOR ALL TABLES WITH (publish = ‘insert, update, delete,truncate’); –创建发布所有表的特定操作
SELECT * FROM pg_publication_tables; –查看发布的表

订阅相关命令

d1=# \h ALTER SUBSCRIPTION
Command:     ALTER SUBSCRIPTION
Description: change the definition of a subscription
Syntax:
ALTER SUBSCRIPTION name CONNECTION 'conninfo'
ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name ADD PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name DROP PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name ENABLE
ALTER SUBSCRIPTION name DISABLE
ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] )
ALTER SUBSCRIPTION name SKIP ( skip_option = value )
ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER SUBSCRIPTION name RENAME TO new_name

d1=# \h drop SUBSCRIPTION
Command:     DROP SUBSCRIPTION
Description: remove a subscription
Syntax:
DROP SUBSCRIPTION [ IF EXISTS ] name [ CASCADE | RESTRICT ]

CREATE SUBSCRIPTION sub_incremental CONNECTION ‘host=127.0.0.1 port=5003 user=unvdb password=1 dbname=d1’PUBLICATION pub_name WITH ( slot_name = ‘slot_standby’,create_slot = true, copy_data = false,enabled = false,origin = none, filter = ‘WHERE id > 100’); –创建订阅

参数 类型 描述 默认值
CONNECTION 字符串 到发布者的连接字符串,格式为 libpq 连接 URI -
PUBLICATION 文本 要订阅的发布名称列表 -
copy_data 布尔 是否在订阅开始时复制现有数据 true
create_slot 布尔 是否在发布者上创建复制槽 true
enabled 布尔 是否立即启用订阅 true
slot_name 文本 要使用的复制槽名称 订阅名称
synchronous_commit 枚举 设置订阅者的同步提交模式 off
connect 布尔 是否立即连接到发布者 true
binary 布尔 是否使用二进制格式传输数据 false
streaming 枚举 流式复制的模式 (off/on/parallel) off
two_phase 布尔 是否启用两阶段提交 false
disable_on_error 布尔 出错时是否自动禁用订阅 false
run_as_owner 布尔 是否以订阅所有者的权限运行 true
password_required 布尔 是否要求连接提供密码 true

SELECT * FROM pg_stat_subscription_stats;–查看订阅状态统计 pg_stat_subscription_stats 用于提供关于逻辑复制订阅的统计信息,帮助监控订阅的健康状况和性能。
| 列名| 类型| 描述| | - | - | - | |subid| oid| 订阅的OID| |subname| name| 订阅名称| |apply_error_count| bigint| 应用错误次数| |sync_error_count| bigint| 同步错误次数| |stats_reset| timestamp with time zone| 统计信息最后一次重置的时间|

SELECT * FROM pg_replication_origin_status;
pg_replication_origin_status 是 用于跟踪逻辑复制源状态的重要系统视图,它显示了关于复制源(replication origins)的状态信息。

列名 类型 描述
local_id oid 本地复制源的标识符
external_id text 外部复制源的名称
remote_lsn pg_lsn 最后接收到的远程位置的LSN
local_lsn pg_lsn 最后应用的本地位置的LSN

SELECT * FROM pg_stat_subscription;–查看订阅状态
SELECT * FROM pg_subscription;–查看订阅信息

SELECT * FROM pg_subscription_rel;–查看订阅任务
pg_subscription_rel 是用于跟踪逻辑复制订阅关系的系统表,它记录了每个订阅中各个表的状态信息。

列名 类型 描述
srsubid oid 订阅的 OID (引用 pg_subscription.oid)
srrelid oid 表的 OID (引用 pg_class.oid)
srsubstate char 表的状态代码.i 初始化状态,d 数据正在复制,s 同步完成 ,r 准备好 (可以接收更改) ,f 复制失败
srsublsn pg_lsn 最后一个同步的 LSN (日志序列号)位置

ALTER SUBSCRIPTION sub_name DISABLE;– 暂停订阅 ALTER SUBSCRIPTION sub_name ENABLE;– 启用订阅 ALTER SUBSCRIPTION sub_name ADD PUBLICATION another_pub;– 添加额外的发布到现有订阅

SELECT pg_replication_origin_advance(’pg_24579’, ‘0/50576B9’::pg_lsn);–设置复制槽位置
pg_replication_origin_advance 用于管理逻辑复制进度的重要函数,它允许手动设置复制源的进度位置。 函数说明
sql pg_replication_origin_advance(origin_name text, lsn pg_lsn) RETURNS void 参数

  • origin_name:复制源的名称(最大长度63字节)

  • lsn:要设置的新日志序列号(LSN)位置