PostgreSQL 逻辑复制中的表DDL变更执行步骤


 

PostgreSQL的逻辑复制不会自动同步DDL(第三方插件另说),因此在逻辑复制环境中,当表发布的表执行DDL的时候,如果不停止业务的话,publication(发布节点,主节点)和subscription(订阅节点,从节点)如何执行相关的DDL,才能确保复制正常运行,不受DDL的影响?

结论是:先在subscription从节点执行DDL,然后在publication主节点执行相关DDL,这样的话,复制不会受到任何影响。

在不停止业务的情况下,如果先在主节点DDL,那么就可能会往表的新增字段中写入数据,当WAL Sender进程解码出来新增的字段,通过apply worker进程在subscription上执行的时候,会因为找不到相关的字段报错。相反,如果先在subscription上执行相关DDL,修改表结构,此时apply worker中的数据并不包含新增字段的数据,是不影响复制的,然后在publication上执行相关DDL,这样就算是新增字段的数据立马写入数据,通过WAL Sender传递到subsacrption上可以完全正确地执行。

以下以t3表为例,t3表是一个发布订阅的表,现在要增加字段,也即执行DDL,可以先从subscription上执行,然后再publication执行,这样整个复制过程不会收到任何影响。

1,主节点逻辑复制槽以及publication信息

 

2,从节点subscription信息

 

3,主节点上写入数据

 

4,从节点正常复制

 

5,subscription从节点先执行DDL,与此同时,主节点执行DDL之前,t3上再次写入一条测试数据,同时可以正常复制主节点数据

 

6,publication主节点执行DDL,执行完往新字段写入数据

 

7,subscription从节点复制正常

 

主节点测试脚本

db01=# \x
Expanded display is on.
db01=#
db01=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+------------------------------
slot_name           | pgstandby_slave01
plugin              |
slot_type           | physical
datoid              |
database            |
temporary           | f
active              | f
active_pid          |
xmin                |
catalog_xmin        |
restart_lsn         | 1/20518D30
confirmed_flush_lsn |
wal_status          | reserved
safe_wal_size       |
two_phase           | f
conflicting         |
-[ RECORD 2 ]-------+------------------------------
slot_name           | db01_logic_replication_slot01
plugin              | pgoutput
slot_type           | logical
datoid              | 16400
database            | db01
temporary           | f
active              | t
active_pid          | 33412
xmin                |
catalog_xmin        | 1171
restart_lsn         | 1/214B8040
confirmed_flush_lsn | 1/214B8078
wal_status          | reserved
safe_wal_size       |
two_phase           | f
conflicting         | f

db01=#
db01=# \x off
Expanded display is off.
db01=#
db01=#  select * from pg_catalog.pg_publication;
  oid  |        pubname         | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-------+------------------------+----------+--------------+-----------+-----------+-----------+-------------+------------
 41149 | master_db01_pulication |       10 | f            | t         | t         | f         | f           | f
(1 row)

db01=#
db01=# SELECT oid, prpubid, prrelid::regclass FROM pg_publication_rel;
  oid  | prpubid | prrelid
-------+---------+---------
 41150 |   41149 | t1
 41151 |   41149 | t2
 41158 |   41149 | t3
(3 rows)

db01=#
db01=# \d t3
                                     Table "public.t3"
 Column |            Type             | Collation | Nullable |           Default
--------+-----------------------------+-----------+----------+------------------------------
 c1     | integer                     |           | not null | generated always as identity
 c2     | character varying(100)      |           |          |
 c3     | timestamp without time zone |           |          |
Indexes:
    "t3_pkey" PRIMARY KEY, btree (c1)
Publications:
    "master_db01_pulication"

db01=#
db01=# insert into t3(c1,c2,c3) values (1,'aaa',now());
ERROR:  cannot insert a non-DEFAULT value into column "c1"
DETAIL:  Column "c1" is an identity column defined as GENERATED ALWAYS.
HINT:  Use OVERRIDING SYSTEM VALUE to override.
db01=#
db01=#
db01=#
db01=#
db01=# \d t3
                                     Table "public.t3"
 Column |            Type             | Collation | Nullable |           Default
--------+-----------------------------+-----------+----------+------------------------------
 c1     | integer                     |           | not null | generated always as identity
 c2     | character varying(100)      |           |          |
 c3     | timestamp without time zone |           |          |
Indexes:
    "t3_pkey" PRIMARY KEY, btree (c1)
Publications:
    "master_db01_pulication"

db01=#
db01=#
db01=# insert into t3(c2,c3) values ('aaa',now());
INSERT 0 1
db01=#
db01=# select * from t3
db01-# ;
 c1 | c2  |            c3
----+-----+--------------------------
  1 | aaa | 2025-06-26 14:31:50.1698
(1 row)

db01=#
db01=#
db01=# insert into t3(c2,c3) values ('bbb',now());
INSERT 0 1
db01=#
db01=# select * from t3;
 c1 | c2  |             c3
----+-----+----------------------------
  1 | aaa | 2025-06-26 14:31:50.1698
  2 | bbb | 2025-06-26 14:35:15.171404
(2 rows)

db01=# alter table t3 add c4 timestamp;
ALTER TABLE
db01=#
db01=# insert into t3(c2,c3) values ('ccc',now(),now());
ERROR:  INSERT has more expressions than target columns
LINE 1: insert into t3(c2,c3) values ('ccc',now(),now());
                                                  ^
db01=# insert into t3(c2,c3,c4) values ('ccc',now(),now());
INSERT 0 1
db01=#
db01=# select * from t3;
 c1 | c2  |             c3             |             c4
----+-----+----------------------------+----------------------------
  1 | aaa | 2025-06-26 14:31:50.1698   |
  2 | bbb | 2025-06-26 14:35:15.171404 |
  3 | ccc | 2025-06-26 14:37:43.571553 | 2025-06-26 14:37:43.571553
(3 rows)

db01=#
db01=#

 

从节点测试脚本

mydb01=#  select * from pg_subscription;
-[ RECORD 1 ]-------+---------------------------------------------------------------------------------
oid                 | 16407
subdbid             | 16388
subskiplsn          | 0/0
subname             | slave_db01_subscription
subowner            | 10
subenabled          | t
subbinary           | f
substream           | f
subtwophasestate    | d
subdisableonerr     | f
subpasswordrequired | t
subrunasowner       | f
subconninfo         | host=8.*.*.129 port=9000 dbname=db01 user=replica_user  password=******
subslotname         | db01_logic_replication_slot01
subsynccommit       | off
subpublications     | {master_db01_pulication}
suborigin           | any

mydb01=# SELECT *, srrelid::regclass FROM pg_subscription_rel;
-[ RECORD 1 ]----------
srsubid    | 16407
srrelid    | 16402
srsubstate | r
srsublsn   | 1/21166E58
srrelid    | t3
-[ RECORD 2 ]----------
srsubid    | 16407
srrelid    | 16390
srsubstate | r
srsublsn   | 1/2116F7E0
srrelid    | t1
-[ RECORD 3 ]----------
srsubid    | 16407
srrelid    | 16396
srsubstate | r
srsublsn   | 1/21171BF0
srrelid    | t2

mydb01=#
mydb01=# \d t3
                                     Table "public.t3"
 Column |            Type             | Collation | Nullable |           Default
--------+-----------------------------+-----------+----------+------------------------------
 c1     | integer                     |           | not null | generated always as identity
 c2     | character varying(100)      |           |          |
 c3     | timestamp without time zone |           |          |
Indexes:
    "t3_pkey" PRIMARY KEY, btree (c1)

mydb01=#
mydb01=#
mydb01=# select * from t3;
-[ RECORD 1 ]----------------
c1 | 1
c2 | aaa
c3 | 2025-06-26 14:31:50.1698

mydb01=#
mydb01=# \x off
Expanded display is off.
mydb01=#
mydb01=#
mydb01=# select * from t3;
 c1 | c2  |            c3
----+-----+--------------------------
  1 | aaa | 2025-06-26 14:31:50.1698
(1 row)

mydb01=#
mydb01=#
mydb01=#
mydb01=# \d t3
                                     Table "public.t3"
 Column |            Type             | Collation | Nullable |           Default
--------+-----------------------------+-----------+----------+------------------------------
 c1     | integer                     |           | not null | generated always as identity
 c2     | character varying(100)      |           |          |
 c3     | timestamp without time zone |           |          |
Indexes:
    "t3_pkey" PRIMARY KEY, btree (c1)

mydb01=#
mydb01=#
mydb01=# select * from t3;
 c1 | c2  |            c3
----+-----+--------------------------
  1 | aaa | 2025-06-26 14:31:50.1698
(1 row)

mydb01=#
mydb01=#
mydb01=# alter table t3 add c4 timestamp;
ALTER TABLE
mydb01=#
mydb01=# select * from t3;
 c1 | c2  |             c3             | c4
----+-----+----------------------------+----
  1 | aaa | 2025-06-26 14:31:50.1698   |
  2 | bbb | 2025-06-26 14:35:15.171404 |
(2 rows)

mydb01=# select * from t3;
 c1 | c2  |             c3             |             c4
----+-----+----------------------------+----------------------------
  1 | aaa | 2025-06-26 14:31:50.1698   |
  2 | bbb | 2025-06-26 14:35:15.171404 |
  3 | ccc | 2025-06-26 14:37:43.571553 | 2025-06-26 14:37:43.571553
(3 rows)

mydb01=#