
PostgreSQL的逻辑复制不会自动同步DDL(第三方插件另说),因此在逻辑复制环境中,当表发布的表执行DDL的时候,如果不停止业务的话,publication(发布节点,主节点)和subscription(订阅节点,从节点)如何执行相关的DDL,才能确保复制正常运行,不受DDL的影响?
结论是:先在subscription从节点执行DDL,然后在publication主节点执行相关DDL,这样的话,复制不会受到任何影响。
在不停止业务的情况下,如果先在主节点DDL,那么就可能会往表的新增字段中写入数据,当WAL Sender进程解码出来新增的字段,通过apply worker进程在subscription上执行的时候,会因为找不到相关的字段报错。相反,如果先在subscription上执行相关DDL,修改表结构,此时apply worker中的数据并不包含新增字段的数据,是不影响复制的,然后在publication上执行相关DDL,这样就算是新增字段的数据立马写入数据,通过WAL Sender传递到subsacrption上可以完全正确地执行。
以下以t3表为例,t3表是一个发布订阅的表,现在要增加字段,也即执行DDL,可以先从subscription上执行,然后再publication执行,这样整个复制过程不会收到任何影响。
1,主节点逻辑复制槽以及publication信息
2,从节点subscription信息
3,主节点上写入数据
4,从节点正常复制
5,subscription从节点先执行DDL,与此同时,主节点执行DDL之前,t3上再次写入一条测试数据,同时可以正常复制主节点数据
6,publication主节点执行DDL,执行完往新字段写入数据
7,subscription从节点复制正常
主节点测试脚本
db01=# \x
Expanded display is on.
db01=#
db01=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+------------------------------
slot_name | pgstandby_slave01
plugin |
slot_type | physical
datoid |
database |
temporary | f
active | f
active_pid |
xmin |
catalog_xmin |
restart_lsn | 1/20518D30
confirmed_flush_lsn |
wal_status | reserved
safe_wal_size |
two_phase | f
conflicting |
-[ RECORD 2 ]-------+------------------------------
slot_name | db01_logic_replication_slot01
plugin | pgoutput
slot_type | logical
datoid | 16400
database | db01
temporary | f
active | t
active_pid | 33412
xmin |
catalog_xmin | 1171
restart_lsn | 1/214B8040
confirmed_flush_lsn | 1/214B8078
wal_status | reserved
safe_wal_size |
two_phase | f
conflicting | f
db01=#
db01=# \x off
Expanded display is off.
db01=#
db01=# select * from pg_catalog.pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-------+------------------------+----------+--------------+-----------+-----------+-----------+-------------+------------
41149 | master_db01_pulication | 10 | f | t | t | f | f | f
(1 row)
db01=#
db01=# SELECT oid, prpubid, prrelid::regclass FROM pg_publication_rel;
oid | prpubid | prrelid
-------+---------+---------
41150 | 41149 | t1
41151 | 41149 | t2
41158 | 41149 | t3
(3 rows)
db01=#
db01=# \d t3
Table "public.t3"
Column | Type | Collation | Nullable | Default
--------+-----------------------------+-----------+----------+------------------------------
c1 | integer | | not null | generated always as identity
c2 | character varying(100) | | |
c3 | timestamp without time zone | | |
Indexes:
"t3_pkey" PRIMARY KEY, btree (c1)
Publications:
"master_db01_pulication"
db01=#
db01=# insert into t3(c1,c2,c3) values (1,'aaa',now());
ERROR: cannot insert a non-DEFAULT value into column "c1"
DETAIL: Column "c1" is an identity column defined as GENERATED ALWAYS.
HINT: Use OVERRIDING SYSTEM VALUE to override.
db01=#
db01=#
db01=#
db01=#
db01=# \d t3
Table "public.t3"
Column | Type | Collation | Nullable | Default
--------+-----------------------------+-----------+----------+------------------------------
c1 | integer | | not null | generated always as identity
c2 | character varying(100) | | |
c3 | timestamp without time zone | | |
Indexes:
"t3_pkey" PRIMARY KEY, btree (c1)
Publications:
"master_db01_pulication"
db01=#
db01=#
db01=# insert into t3(c2,c3) values ('aaa',now());
INSERT 0 1
db01=#
db01=# select * from t3
db01-# ;
c1 | c2 | c3
----+-----+--------------------------
1 | aaa | 2025-06-26 14:31:50.1698
(1 row)
db01=#
db01=#
db01=# insert into t3(c2,c3) values ('bbb',now());
INSERT 0 1
db01=#
db01=# select * from t3;
c1 | c2 | c3
----+-----+----------------------------
1 | aaa | 2025-06-26 14:31:50.1698
2 | bbb | 2025-06-26 14:35:15.171404
(2 rows)
db01=# alter table t3 add c4 timestamp;
ALTER TABLE
db01=#
db01=# insert into t3(c2,c3) values ('ccc',now(),now());
ERROR: INSERT has more expressions than target columns
LINE 1: insert into t3(c2,c3) values ('ccc',now(),now());
^
db01=# insert into t3(c2,c3,c4) values ('ccc',now(),now());
INSERT 0 1
db01=#
db01=# select * from t3;
c1 | c2 | c3 | c4
----+-----+----------------------------+----------------------------
1 | aaa | 2025-06-26 14:31:50.1698 |
2 | bbb | 2025-06-26 14:35:15.171404 |
3 | ccc | 2025-06-26 14:37:43.571553 | 2025-06-26 14:37:43.571553
(3 rows)
db01=#
db01=#
从节点测试脚本
mydb01=# select * from pg_subscription;
-[ RECORD 1 ]-------+---------------------------------------------------------------------------------
oid | 16407
subdbid | 16388
subskiplsn | 0/0
subname | slave_db01_subscription
subowner | 10
subenabled | t
subbinary | f
substream | f
subtwophasestate | d
subdisableonerr | f
subpasswordrequired | t
subrunasowner | f
subconninfo | host=8.*.*.129 port=9000 dbname=db01 user=replica_user password=******
subslotname | db01_logic_replication_slot01
subsynccommit | off
subpublications | {master_db01_pulication}
suborigin | any
mydb01=# SELECT *, srrelid::regclass FROM pg_subscription_rel;
-[ RECORD 1 ]----------
srsubid | 16407
srrelid | 16402
srsubstate | r
srsublsn | 1/21166E58
srrelid | t3
-[ RECORD 2 ]----------
srsubid | 16407
srrelid | 16390
srsubstate | r
srsublsn | 1/2116F7E0
srrelid | t1
-[ RECORD 3 ]----------
srsubid | 16407
srrelid | 16396
srsubstate | r
srsublsn | 1/21171BF0
srrelid | t2
mydb01=#
mydb01=# \d t3
Table "public.t3"
Column | Type | Collation | Nullable | Default
--------+-----------------------------+-----------+----------+------------------------------
c1 | integer | | not null | generated always as identity
c2 | character varying(100) | | |
c3 | timestamp without time zone | | |
Indexes:
"t3_pkey" PRIMARY KEY, btree (c1)
mydb01=#
mydb01=#
mydb01=# select * from t3;
-[ RECORD 1 ]----------------
c1 | 1
c2 | aaa
c3 | 2025-06-26 14:31:50.1698
mydb01=#
mydb01=# \x off
Expanded display is off.
mydb01=#
mydb01=#
mydb01=# select * from t3;
c1 | c2 | c3
----+-----+--------------------------
1 | aaa | 2025-06-26 14:31:50.1698
(1 row)
mydb01=#
mydb01=#
mydb01=#
mydb01=# \d t3
Table "public.t3"
Column | Type | Collation | Nullable | Default
--------+-----------------------------+-----------+----------+------------------------------
c1 | integer | | not null | generated always as identity
c2 | character varying(100) | | |
c3 | timestamp without time zone | | |
Indexes:
"t3_pkey" PRIMARY KEY, btree (c1)
mydb01=#
mydb01=#
mydb01=# select * from t3;
c1 | c2 | c3
----+-----+--------------------------
1 | aaa | 2025-06-26 14:31:50.1698
(1 row)
mydb01=#
mydb01=#
mydb01=# alter table t3 add c4 timestamp;
ALTER TABLE
mydb01=#
mydb01=# select * from t3;
c1 | c2 | c3 | c4
----+-----+----------------------------+----
1 | aaa | 2025-06-26 14:31:50.1698 |
2 | bbb | 2025-06-26 14:35:15.171404 |
(2 rows)
mydb01=# select * from t3;
c1 | c2 | c3 | c4
----+-----+----------------------------+----------------------------
1 | aaa | 2025-06-26 14:31:50.1698 |
2 | bbb | 2025-06-26 14:35:15.171404 |
3 | ccc | 2025-06-26 14:37:43.571553 | 2025-06-26 14:37:43.571553
(3 rows)
mydb01=#