decoderbufs
重新生成原型
改变 ProtoBuf 定义之后 (_proto/pg_logicaldec.proto):
cd proto
protoc-c --c_out=../src/proto pg_logicaldec.proto
cd ..
修改配置文件
修改 unvdbsvr.conf:
```bash
# MODULES
shared_preload_libraries = 'decoderbufs'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 8 # max number of walsender processes (change requires restart)
wal_keep_size = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
还需要为连接数据库的用户添加复制权限(replication permission)。这可以通过修改 pg_hba.conf 文件来实现, 需要其他验证方式请修改trust, trust一般只用于本地开发测试。
local replication <youruser> trust
host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust
之后重启unvdb
使用
-- can use SQL for demo purposes
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
CREATE TABLE test_table (
id SERIAL PRIMARY KEY,
name TEXT
);
INSERT INTO test_table (name) VALUES ('Alice'), ('Bob');
-- peek at WAL changes using decoderbufs debug mode for SQL console, 0 rows
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- get WAL changes using decoderbufs to update the WAL position
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
-- check the WAL position of logical replicators
select * from pg_replication_slots where slot_type = 'logical';
SELECT pg_drop_replication_slot('decoderbufs_demo');
如果你对表执行了 UPDATE 或 DELETE 操作,却没有在logical replication的输出中看到这些变更,确保你已经正确设置了 REPLICA IDENTITY。 用 decoderbufs 插件生成的 WAL 变更记录(二进制 protobuf 数据),会被连接器读取和解析,进而将变更数据流转到如 Kafka 等下游系统中,用于 数据同步、变更捕获(CDC)、实时 ETL 等场景。
客户端测试
ud_recvlogical -d unvdb -U unvdb --slot=raw_slot --start --plugin=decoder_raw --file=- --no-loop
类型映射
Unvdb 和 Decoderbuf的类型映射
| Unvdb Type OID | Decoderbuf Field |
|---|---|
| BOOLOID | datum_boolean |
| INT2OID | datum_int32 |
| INT4OID | datum_int32 |
| INT8OID | datum_int64 |
| OIDOID | datum_int64 |
| FLOAT4OID | datum_float |
| FLOAT8OID | datum_double |
| NUMERICOID | datum_double |
| CHAROID | datum_string |
| VARCHAROID | datum_string |
| BPCHAROID | datum_string |
| TEXTOID | datum_string |
| JSONOID | datum_string |
| XMLOID | datum_string |
| UUIDOID | datum_string |
| TIMESTAMPOID | datum_string |
| TIMESTAMPTZOID | datum_string |
| BYTEAOID | datum_bytes |
| POINTOID | datum_point |
| PostGIS geometry | datum_point |
| PostGIS geography | datum_point |