decoderbufs

重新生成原型

改变 ProtoBuf 定义之后 (_proto/pg_logicaldec.proto):

cd proto
protoc-c --c_out=../src/proto pg_logicaldec.proto
cd ..

修改配置文件


修改 unvdbsvr.conf:

```bash
# MODULES
shared_preload_libraries = 'decoderbufs'

# REPLICATION
wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8             # max number of walsender processes (change requires restart)
wal_keep_size = 4           # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s       # in milliseconds; 0 disables
max_replication_slots = 4       # max number of replication slots (change requires restart)

还需要为连接数据库的用户添加复制权限(replication permission)。这可以通过修改 pg_hba.conf 文件来实现, 需要其他验证方式请修改trust, trust一般只用于本地开发测试。

local   replication     <youruser>                          trust
host    replication     <youruser>  127.0.0.1/32            trust
host    replication     <youruser>  ::1/128                 trust

之后重启unvdb

使用

-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');

-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
CREATE TABLE test_table (
  id SERIAL PRIMARY KEY,
  name TEXT
);

INSERT INTO test_table (name) VALUES ('Alice'), ('Bob');

-- peek at WAL changes using decoderbufs debug mode for SQL console, 0 rows
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');

-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');

-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';

SELECT pg_drop_replication_slot('decoderbufs_demo');

如果你对表执行了 UPDATE 或 DELETE 操作,却没有在logical replication的输出中看到这些变更,确保你已经正确设置了 REPLICA IDENTITY。 用 decoderbufs 插件生成的 WAL 变更记录(二进制 protobuf 数据),会被连接器读取和解析,进而将变更数据流转到如 Kafka 等下游系统中,用于 数据同步、变更捕获(CDC)、实时 ETL 等场景。

客户端测试

ud_recvlogical -d unvdb -U unvdb --slot=raw_slot --start --plugin=decoder_raw --file=- --no-loop

类型映射

Unvdb 和 Decoderbuf的类型映射

Unvdb Type OID Decoderbuf Field
BOOLOID datum_boolean
INT2OID datum_int32
INT4OID datum_int32
INT8OID datum_int64
OIDOID datum_int64
FLOAT4OID datum_float
FLOAT8OID datum_double
NUMERICOID datum_double
CHAROID datum_string
VARCHAROID datum_string
BPCHAROID datum_string
TEXTOID datum_string
JSONOID datum_string
XMLOID datum_string
UUIDOID datum_string
TIMESTAMPOID datum_string
TIMESTAMPTZOID datum_string
BYTEAOID datum_bytes
POINTOID datum_point
PostGIS geometry datum_point
PostGIS geography datum_point