init
This commit is contained in:
53
db_include/replication/backup_manifest.h
Executable file
53
db_include/replication/backup_manifest.h
Executable file
@@ -0,0 +1,53 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* backup_manifest.h
|
||||
* Routines for generating a backup manifest.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/backup_manifest.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef BACKUP_MANIFEST_H
|
||||
#define BACKUP_MANIFEST_H
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
#include "common/checksum_helper.h"
|
||||
#include "pgtime.h"
|
||||
#include "storage/buffile.h"
|
||||
|
||||
typedef enum manifest_option
|
||||
{
|
||||
MANIFEST_OPTION_YES,
|
||||
MANIFEST_OPTION_NO,
|
||||
MANIFEST_OPTION_FORCE_ENCODE
|
||||
} backup_manifest_option;
|
||||
|
||||
typedef struct backup_manifest_info
|
||||
{
|
||||
BufFile *buffile;
|
||||
pg_checksum_type checksum_type;
|
||||
pg_cryptohash_ctx *manifest_ctx;
|
||||
uint64 manifest_size;
|
||||
bool force_encode;
|
||||
bool first_file;
|
||||
bool still_checksumming;
|
||||
} backup_manifest_info;
|
||||
|
||||
extern void InitializeBackupManifest(backup_manifest_info *manifest,
|
||||
backup_manifest_option want_manifest,
|
||||
pg_checksum_type manifest_checksum_type);
|
||||
extern void AddFileToBackupManifest(backup_manifest_info *manifest,
|
||||
const char *spcoid,
|
||||
const char *pathname, size_t size,
|
||||
pg_time_t mtime,
|
||||
pg_checksum_context *checksum_ctx);
|
||||
extern void AddWALInfoToBackupManifest(backup_manifest_info *manifest,
|
||||
XLogRecPtr startptr,
|
||||
TimeLineID starttli, XLogRecPtr endptr,
|
||||
TimeLineID endtli);
|
||||
extern void SendBackupManifest(backup_manifest_info *manifest);
|
||||
extern void FreeBackupManifest(backup_manifest_info *manifest);
|
||||
|
||||
#endif
|
||||
39
db_include/replication/basebackup.h
Executable file
39
db_include/replication/basebackup.h
Executable file
@@ -0,0 +1,39 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* basebackup.h
|
||||
* Exports from replication/basebackup.c.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/basebackup.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef _BASEBACKUP_H
|
||||
#define _BASEBACKUP_H
|
||||
|
||||
#include "nodes/replnodes.h"
|
||||
|
||||
/*
|
||||
* Minimum and maximum values of MAX_RATE option in BASE_BACKUP command.
|
||||
*/
|
||||
#define MAX_RATE_LOWER 32
|
||||
#define MAX_RATE_UPPER 1048576
|
||||
|
||||
/*
|
||||
* Information about a tablespace
|
||||
*
|
||||
* In some usages, "path" can be NULL to denote the PGDATA directory itself.
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
char *oid; /* tablespace's OID, as a decimal string */
|
||||
char *path; /* full path to tablespace's directory */
|
||||
char *rpath; /* relative path if it's within PGDATA, else
|
||||
* NULL */
|
||||
int64 size; /* total size as sent; -1 if not known */
|
||||
} tablespaceinfo;
|
||||
|
||||
extern void SendBaseBackup(BaseBackupCmd *cmd);
|
||||
|
||||
#endif /* _BASEBACKUP_H */
|
||||
20
db_include/replication/decode.h
Executable file
20
db_include/replication/decode.h
Executable file
@@ -0,0 +1,20 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
* decode.h
|
||||
* PostgreSQL WAL to logical transformation
|
||||
*
|
||||
* Portions Copyright (c) 2012-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef DECODE_H
|
||||
#define DECODE_H
|
||||
|
||||
#include "access/xlogreader.h"
|
||||
#include "access/xlogrecord.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/reorderbuffer.h"
|
||||
|
||||
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
|
||||
XLogReaderState *record);
|
||||
|
||||
#endif
|
||||
134
db_include/replication/logical.h
Executable file
134
db_include/replication/logical.h
Executable file
@@ -0,0 +1,134 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
* logical.h
|
||||
* PostgreSQL logical decoding coordination
|
||||
*
|
||||
* Copyright (c) 2012-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef LOGICAL_H
|
||||
#define LOGICAL_H
|
||||
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogreader.h"
|
||||
#include "replication/output_plugin.h"
|
||||
#include "replication/slot.h"
|
||||
|
||||
struct LogicalDecodingContext;
|
||||
|
||||
typedef void (*LogicalOutputPluginWriterWrite) (struct LogicalDecodingContext *lr,
|
||||
XLogRecPtr Ptr,
|
||||
TransactionId xid,
|
||||
bool last_write
|
||||
);
|
||||
|
||||
typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
|
||||
|
||||
typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
|
||||
XLogRecPtr Ptr,
|
||||
TransactionId xid
|
||||
);
|
||||
|
||||
typedef struct LogicalDecodingContext
|
||||
{
|
||||
/* memory context this is all allocated in */
|
||||
MemoryContext context;
|
||||
|
||||
/* The associated replication slot */
|
||||
ReplicationSlot *slot;
|
||||
|
||||
/* infrastructure pieces for decoding */
|
||||
XLogReaderState *reader;
|
||||
struct ReorderBuffer *reorder;
|
||||
struct SnapBuild *snapshot_builder;
|
||||
|
||||
/*
|
||||
* Marks the logical decoding context as fast forward decoding one. Such a
|
||||
* context does not have plugin loaded so most of the following properties
|
||||
* are unused.
|
||||
*/
|
||||
bool fast_forward;
|
||||
|
||||
OutputPluginCallbacks callbacks;
|
||||
OutputPluginOptions options;
|
||||
|
||||
/*
|
||||
* User specified options
|
||||
*/
|
||||
List *output_plugin_options;
|
||||
|
||||
/*
|
||||
* User-Provided callback for writing/streaming out data.
|
||||
*/
|
||||
LogicalOutputPluginWriterPrepareWrite prepare_write;
|
||||
LogicalOutputPluginWriterWrite write;
|
||||
LogicalOutputPluginWriterUpdateProgress update_progress;
|
||||
|
||||
/*
|
||||
* Output buffer.
|
||||
*/
|
||||
StringInfo out;
|
||||
|
||||
/*
|
||||
* Private data pointer of the output plugin.
|
||||
*/
|
||||
void *output_plugin_private;
|
||||
|
||||
/*
|
||||
* Private data pointer for the data writer.
|
||||
*/
|
||||
void *output_writer_private;
|
||||
|
||||
/*
|
||||
* Does the output plugin support streaming, and is it enabled?
|
||||
*/
|
||||
bool streaming;
|
||||
|
||||
/*
|
||||
* Does the output plugin support two-phase decoding, and is it enabled?
|
||||
*/
|
||||
bool twophase;
|
||||
|
||||
/*
|
||||
* State for writing output.
|
||||
*/
|
||||
bool accept_writes;
|
||||
bool prepared_write;
|
||||
XLogRecPtr write_location;
|
||||
TransactionId write_xid;
|
||||
} LogicalDecodingContext;
|
||||
|
||||
|
||||
extern void CheckLogicalDecodingRequirements(void);
|
||||
|
||||
extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
|
||||
List *output_plugin_options,
|
||||
bool need_full_snapshot,
|
||||
XLogRecPtr restart_lsn,
|
||||
XLogReaderRoutine *xl_routine,
|
||||
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
||||
LogicalOutputPluginWriterWrite do_write,
|
||||
LogicalOutputPluginWriterUpdateProgress update_progress);
|
||||
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
|
||||
List *output_plugin_options,
|
||||
bool fast_forward,
|
||||
XLogReaderRoutine *xl_routine,
|
||||
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
||||
LogicalOutputPluginWriterWrite do_write,
|
||||
LogicalOutputPluginWriterUpdateProgress update_progress);
|
||||
extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
|
||||
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
|
||||
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
|
||||
|
||||
extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
|
||||
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
|
||||
XLogRecPtr restart_lsn);
|
||||
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
|
||||
|
||||
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
|
||||
TransactionId xid, const char *gid);
|
||||
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
|
||||
extern void ResetLogicalStreamingState(void);
|
||||
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
|
||||
|
||||
#endif
|
||||
29
db_include/replication/logicallauncher.h
Executable file
29
db_include/replication/logicallauncher.h
Executable file
@@ -0,0 +1,29 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* logicallauncher.h
|
||||
* Exports for logical replication launcher.
|
||||
*
|
||||
* Portions Copyright (c) 2016-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/logicallauncher.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef LOGICALLAUNCHER_H
|
||||
#define LOGICALLAUNCHER_H
|
||||
|
||||
extern int max_logical_replication_workers;
|
||||
extern int max_sync_workers_per_subscription;
|
||||
|
||||
extern void ApplyLauncherRegister(void);
|
||||
extern void ApplyLauncherMain(Datum main_arg);
|
||||
|
||||
extern Size ApplyLauncherShmemSize(void);
|
||||
extern void ApplyLauncherShmemInit(void);
|
||||
|
||||
extern void ApplyLauncherWakeupAtCommit(void);
|
||||
extern void AtEOXact_ApplyLauncher(bool isCommit);
|
||||
|
||||
extern bool IsLogicalLauncher(void);
|
||||
|
||||
#endif /* LOGICALLAUNCHER_H */
|
||||
177
db_include/replication/logicalproto.h
Executable file
177
db_include/replication/logicalproto.h
Executable file
@@ -0,0 +1,177 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* logicalproto.h
|
||||
* logical replication protocol
|
||||
*
|
||||
* Copyright (c) 2015-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/include/replication/logicalproto.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef LOGICAL_PROTO_H
|
||||
#define LOGICAL_PROTO_H
|
||||
|
||||
#include "replication/reorderbuffer.h"
|
||||
#include "utils/rel.h"
|
||||
|
||||
/*
|
||||
* Protocol capabilities
|
||||
*
|
||||
* LOGICALREP_PROTO_VERSION_NUM is our native protocol.
|
||||
* LOGICALREP_PROTO_MAX_VERSION_NUM is the greatest version we can support.
|
||||
* LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we
|
||||
* have backwards compatibility for. The client requests protocol version at
|
||||
* connect time.
|
||||
*
|
||||
* LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with
|
||||
* support for streaming large transactions.
|
||||
*/
|
||||
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
|
||||
#define LOGICALREP_PROTO_VERSION_NUM 1
|
||||
#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
|
||||
#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
|
||||
|
||||
/*
|
||||
* Logical message types
|
||||
*
|
||||
* Used by logical replication wire protocol.
|
||||
*
|
||||
* Note: though this is an enum, the values are used to identify message types
|
||||
* in logical replication protocol, which uses a single byte to identify a
|
||||
* message type. Hence the values should be single-byte wide and preferably
|
||||
* human-readable characters.
|
||||
*/
|
||||
typedef enum LogicalRepMsgType
|
||||
{
|
||||
LOGICAL_REP_MSG_BEGIN = 'B',
|
||||
LOGICAL_REP_MSG_COMMIT = 'C',
|
||||
LOGICAL_REP_MSG_ORIGIN = 'O',
|
||||
LOGICAL_REP_MSG_INSERT = 'I',
|
||||
LOGICAL_REP_MSG_UPDATE = 'U',
|
||||
LOGICAL_REP_MSG_DELETE = 'D',
|
||||
LOGICAL_REP_MSG_TRUNCATE = 'T',
|
||||
LOGICAL_REP_MSG_RELATION = 'R',
|
||||
LOGICAL_REP_MSG_TYPE = 'Y',
|
||||
LOGICAL_REP_MSG_MESSAGE = 'M',
|
||||
LOGICAL_REP_MSG_STREAM_START = 'S',
|
||||
LOGICAL_REP_MSG_STREAM_END = 'E',
|
||||
LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
|
||||
LOGICAL_REP_MSG_STREAM_ABORT = 'A'
|
||||
} LogicalRepMsgType;
|
||||
|
||||
/*
|
||||
* This struct stores a tuple received via logical replication.
|
||||
* Keep in mind that the columns correspond to the *remote* table.
|
||||
*/
|
||||
typedef struct LogicalRepTupleData
|
||||
{
|
||||
/* Array of StringInfos, one per column; some may be unused */
|
||||
StringInfoData *colvalues;
|
||||
/* Array of markers for null/unchanged/text/binary, one per column */
|
||||
char *colstatus;
|
||||
/* Length of above arrays */
|
||||
int ncols;
|
||||
} LogicalRepTupleData;
|
||||
|
||||
/* Possible values for LogicalRepTupleData.colstatus[colnum] */
|
||||
/* These values are also used in the on-the-wire protocol */
|
||||
#define LOGICALREP_COLUMN_NULL 'n'
|
||||
#define LOGICALREP_COLUMN_UNCHANGED 'u'
|
||||
#define LOGICALREP_COLUMN_TEXT 't'
|
||||
#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */
|
||||
|
||||
typedef uint32 LogicalRepRelId;
|
||||
|
||||
/* Relation information */
|
||||
typedef struct LogicalRepRelation
|
||||
{
|
||||
/* Info coming from the remote side. */
|
||||
LogicalRepRelId remoteid; /* unique id of the relation */
|
||||
char *nspname; /* schema name */
|
||||
char *relname; /* relation name */
|
||||
int natts; /* number of columns */
|
||||
char **attnames; /* column names */
|
||||
Oid *atttyps; /* column types */
|
||||
char replident; /* replica identity */
|
||||
char relkind; /* remote relation kind */
|
||||
Bitmapset *attkeys; /* Bitmap of key columns */
|
||||
} LogicalRepRelation;
|
||||
|
||||
/* Type mapping info */
|
||||
typedef struct LogicalRepTyp
|
||||
{
|
||||
Oid remoteid; /* unique id of the remote type */
|
||||
char *nspname; /* schema name of remote type */
|
||||
char *typname; /* name of the remote type */
|
||||
} LogicalRepTyp;
|
||||
|
||||
/* Transaction info */
|
||||
typedef struct LogicalRepBeginData
|
||||
{
|
||||
XLogRecPtr final_lsn;
|
||||
TimestampTz committime;
|
||||
TransactionId xid;
|
||||
} LogicalRepBeginData;
|
||||
|
||||
typedef struct LogicalRepCommitData
|
||||
{
|
||||
XLogRecPtr commit_lsn;
|
||||
XLogRecPtr end_lsn;
|
||||
TimestampTz committime;
|
||||
} LogicalRepCommitData;
|
||||
|
||||
extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
|
||||
extern void logicalrep_read_begin(StringInfo in,
|
||||
LogicalRepBeginData *begin_data);
|
||||
extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
extern void logicalrep_read_commit(StringInfo in,
|
||||
LogicalRepCommitData *commit_data);
|
||||
extern void logicalrep_write_origin(StringInfo out, const char *origin,
|
||||
XLogRecPtr origin_lsn);
|
||||
extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
|
||||
extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
|
||||
Relation rel, HeapTuple newtuple,
|
||||
bool binary);
|
||||
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
|
||||
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
|
||||
Relation rel, HeapTuple oldtuple,
|
||||
HeapTuple newtuple, bool binary);
|
||||
extern LogicalRepRelId logicalrep_read_update(StringInfo in,
|
||||
bool *has_oldtuple, LogicalRepTupleData *oldtup,
|
||||
LogicalRepTupleData *newtup);
|
||||
extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
|
||||
Relation rel, HeapTuple oldtuple,
|
||||
bool binary);
|
||||
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
|
||||
LogicalRepTupleData *oldtup);
|
||||
extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
|
||||
int nrelids, Oid relids[],
|
||||
bool cascade, bool restart_seqs);
|
||||
extern List *logicalrep_read_truncate(StringInfo in,
|
||||
bool *cascade, bool *restart_seqs);
|
||||
extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
|
||||
bool transactional, const char *prefix, Size sz, const char *message);
|
||||
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
|
||||
Relation rel);
|
||||
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
|
||||
extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
|
||||
Oid typoid);
|
||||
extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
|
||||
extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid,
|
||||
bool first_segment);
|
||||
extern TransactionId logicalrep_read_stream_start(StringInfo in,
|
||||
bool *first_segment);
|
||||
extern void logicalrep_write_stream_stop(StringInfo out);
|
||||
extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
extern TransactionId logicalrep_read_stream_commit(StringInfo out,
|
||||
LogicalRepCommitData *commit_data);
|
||||
extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
|
||||
TransactionId subxid);
|
||||
extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
|
||||
TransactionId *subxid);
|
||||
|
||||
#endif /* LOGICAL_PROTO_H */
|
||||
49
db_include/replication/logicalrelation.h
Executable file
49
db_include/replication/logicalrelation.h
Executable file
@@ -0,0 +1,49 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* logicalrelation.h
|
||||
* Relation definitions for logical replication relation mapping.
|
||||
*
|
||||
* Portions Copyright (c) 2016-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/logicalrelation.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef LOGICALRELATION_H
|
||||
#define LOGICALRELATION_H
|
||||
|
||||
#include "access/attmap.h"
|
||||
#include "replication/logicalproto.h"
|
||||
|
||||
typedef struct LogicalRepRelMapEntry
|
||||
{
|
||||
LogicalRepRelation remoterel; /* key is remoterel.remoteid */
|
||||
|
||||
/*
|
||||
* Validity flag -- when false, revalidate all derived info at next
|
||||
* logicalrep_rel_open. (While the localrel is open, we assume our lock
|
||||
* on that rel ensures the info remains good.)
|
||||
*/
|
||||
bool localrelvalid;
|
||||
|
||||
/* Mapping to local relation. */
|
||||
Oid localreloid; /* local relation id */
|
||||
Relation localrel; /* relcache entry (NULL when closed) */
|
||||
AttrMap *attrmap; /* map of local attributes to remote ones */
|
||||
bool updatable; /* Can apply updates/deletes? */
|
||||
|
||||
/* Sync state. */
|
||||
char state;
|
||||
XLogRecPtr statelsn;
|
||||
} LogicalRepRelMapEntry;
|
||||
|
||||
extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
|
||||
|
||||
extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
|
||||
LOCKMODE lockmode);
|
||||
extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
|
||||
Relation partrel, AttrMap *map);
|
||||
extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
|
||||
LOCKMODE lockmode);
|
||||
|
||||
#endif /* LOGICALRELATION_H */
|
||||
19
db_include/replication/logicalworker.h
Executable file
19
db_include/replication/logicalworker.h
Executable file
@@ -0,0 +1,19 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* logicalworker.h
|
||||
* Exports for logical replication workers.
|
||||
*
|
||||
* Portions Copyright (c) 2016-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/logicalworker.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef LOGICALWORKER_H
|
||||
#define LOGICALWORKER_H
|
||||
|
||||
extern void ApplyWorkerMain(Datum main_arg);
|
||||
|
||||
extern bool IsLogicalWorker(void);
|
||||
|
||||
#endif /* LOGICALWORKER_H */
|
||||
41
db_include/replication/message.h
Executable file
41
db_include/replication/message.h
Executable file
@@ -0,0 +1,41 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
* message.h
|
||||
* Exports from replication/logical/message.c
|
||||
*
|
||||
* Copyright (c) 2013-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/message.h
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef PG_LOGICAL_MESSAGE_H
|
||||
#define PG_LOGICAL_MESSAGE_H
|
||||
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "access/xlogreader.h"
|
||||
|
||||
/*
|
||||
* Generic logical decoding message wal record.
|
||||
*/
|
||||
typedef struct xl_logical_message
|
||||
{
|
||||
Oid dbId; /* database Oid emitted from */
|
||||
bool transactional; /* is message transactional? */
|
||||
Size prefix_size; /* length of prefix */
|
||||
Size message_size; /* size of the message */
|
||||
/* payload, including null-terminated prefix of length prefix_size */
|
||||
char message[FLEXIBLE_ARRAY_MEMBER];
|
||||
} xl_logical_message;
|
||||
|
||||
#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
|
||||
|
||||
extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
|
||||
size_t size, bool transactional);
|
||||
|
||||
/* RMGR API*/
|
||||
#define XLOG_LOGICAL_MESSAGE 0x00
|
||||
void logicalmsg_redo(XLogReaderState *record);
|
||||
void logicalmsg_desc(StringInfo buf, XLogReaderState *record);
|
||||
const char *logicalmsg_identify(uint8 info);
|
||||
|
||||
#endif /* PG_LOGICAL_MESSAGE_H */
|
||||
73
db_include/replication/origin.h
Executable file
73
db_include/replication/origin.h
Executable file
@@ -0,0 +1,73 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
* origin.h
|
||||
* Exports from replication/logical/origin.c
|
||||
*
|
||||
* Copyright (c) 2013-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/origin.h
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef PG_ORIGIN_H
|
||||
#define PG_ORIGIN_H
|
||||
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "access/xlogreader.h"
|
||||
#include "catalog/pg_replication_origin.h"
|
||||
|
||||
typedef struct xl_replorigin_set
|
||||
{
|
||||
XLogRecPtr remote_lsn;
|
||||
RepOriginId node_id;
|
||||
bool force;
|
||||
} xl_replorigin_set;
|
||||
|
||||
typedef struct xl_replorigin_drop
|
||||
{
|
||||
RepOriginId node_id;
|
||||
} xl_replorigin_drop;
|
||||
|
||||
#define XLOG_REPLORIGIN_SET 0x00
|
||||
#define XLOG_REPLORIGIN_DROP 0x10
|
||||
|
||||
#define InvalidRepOriginId 0
|
||||
#define DoNotReplicateId PG_UINT16_MAX
|
||||
|
||||
extern PGDLLIMPORT RepOriginId replorigin_session_origin;
|
||||
extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
|
||||
extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
|
||||
|
||||
/* API for querying & manipulating replication origins */
|
||||
extern RepOriginId replorigin_by_name(const char *name, bool missing_ok);
|
||||
extern RepOriginId replorigin_create(const char *name);
|
||||
extern void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait);
|
||||
extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
|
||||
char **roname);
|
||||
|
||||
/* API for querying & manipulating replication progress tracking */
|
||||
extern void replorigin_advance(RepOriginId node,
|
||||
XLogRecPtr remote_commit,
|
||||
XLogRecPtr local_commit,
|
||||
bool go_backward, bool wal_log);
|
||||
extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush);
|
||||
|
||||
extern void replorigin_session_advance(XLogRecPtr remote_commit,
|
||||
XLogRecPtr local_commit);
|
||||
extern void replorigin_session_setup(RepOriginId node);
|
||||
extern void replorigin_session_reset(void);
|
||||
extern XLogRecPtr replorigin_session_get_progress(bool flush);
|
||||
|
||||
/* Checkpoint/Startup integration */
|
||||
extern void CheckPointReplicationOrigin(void);
|
||||
extern void StartupReplicationOrigin(void);
|
||||
|
||||
/* WAL logging */
|
||||
void replorigin_redo(XLogReaderState *record);
|
||||
void replorigin_desc(StringInfo buf, XLogReaderState *record);
|
||||
const char *replorigin_identify(uint8 info);
|
||||
|
||||
/* shared memory allocation */
|
||||
extern Size ReplicationOriginShmemSize(void);
|
||||
extern void ReplicationOriginShmemInit(void);
|
||||
|
||||
#endif /* PG_ORIGIN_H */
|
||||
248
db_include/replication/output_plugin.h
Executable file
248
db_include/replication/output_plugin.h
Executable file
@@ -0,0 +1,248 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
* output_plugin.h
|
||||
* PostgreSQL Logical Decode Plugin Interface
|
||||
*
|
||||
* Copyright (c) 2012-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef OUTPUT_PLUGIN_H
|
||||
#define OUTPUT_PLUGIN_H
|
||||
|
||||
#include "replication/reorderbuffer.h"
|
||||
|
||||
struct LogicalDecodingContext;
|
||||
struct OutputPluginCallbacks;
|
||||
|
||||
typedef enum OutputPluginOutputType
|
||||
{
|
||||
OUTPUT_PLUGIN_BINARY_OUTPUT,
|
||||
OUTPUT_PLUGIN_TEXTUAL_OUTPUT
|
||||
} OutputPluginOutputType;
|
||||
|
||||
/*
|
||||
* Options set by the output plugin, in the startup callback.
|
||||
*/
|
||||
typedef struct OutputPluginOptions
|
||||
{
|
||||
OutputPluginOutputType output_type;
|
||||
bool receive_rewrites;
|
||||
} OutputPluginOptions;
|
||||
|
||||
/*
|
||||
* Type of the shared library symbol _PG_output_plugin_init that is looked up
|
||||
* when loading an output plugin shared library.
|
||||
*/
|
||||
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
|
||||
|
||||
/*
|
||||
* Callback that gets called in a user-defined plugin. ctx->private_data can
|
||||
* be set to some private data.
|
||||
*
|
||||
* "is_init" will be set to "true" if the decoding slot just got defined. When
|
||||
* the same slot is used from there one, it will be "false".
|
||||
*/
|
||||
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
|
||||
OutputPluginOptions *options,
|
||||
bool is_init);
|
||||
|
||||
/*
|
||||
* Callback called for every (explicit or implicit) BEGIN of a successful
|
||||
* transaction.
|
||||
*/
|
||||
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn);
|
||||
|
||||
/*
|
||||
* Callback for every individual change in a successful transaction.
|
||||
*/
|
||||
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
Relation relation,
|
||||
ReorderBufferChange *change);
|
||||
|
||||
/*
|
||||
* Callback for every TRUNCATE in a successful transaction.
|
||||
*/
|
||||
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
int nrelations,
|
||||
Relation relations[],
|
||||
ReorderBufferChange *change);
|
||||
|
||||
/*
|
||||
* Called for every (explicit or implicit) COMMIT of a successful transaction.
|
||||
*/
|
||||
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
|
||||
/*
|
||||
* Called for the generic logical decoding messages.
|
||||
*/
|
||||
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr message_lsn,
|
||||
bool transactional,
|
||||
const char *prefix,
|
||||
Size message_size,
|
||||
const char *message);
|
||||
|
||||
/*
|
||||
* Filter changes by origin.
|
||||
*/
|
||||
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
|
||||
RepOriginId origin_id);
|
||||
|
||||
/*
|
||||
* Called to shutdown an output plugin.
|
||||
*/
|
||||
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
|
||||
|
||||
/*
|
||||
* Called before decoding of PREPARE record to decide whether this
|
||||
* transaction should be decoded with separate calls to prepare and
|
||||
* commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED
|
||||
* and sent as usual transaction.
|
||||
*/
|
||||
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
|
||||
TransactionId xid,
|
||||
const char *gid);
|
||||
|
||||
/*
|
||||
* Callback called for every BEGIN of a prepared trnsaction.
|
||||
*/
|
||||
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn);
|
||||
|
||||
/*
|
||||
* Called for PREPARE record unless it was filtered by filter_prepare()
|
||||
* callback.
|
||||
*/
|
||||
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr prepare_lsn);
|
||||
|
||||
/*
|
||||
* Called for COMMIT PREPARED.
|
||||
*/
|
||||
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
|
||||
/*
|
||||
* Called for ROLLBACK PREPARED.
|
||||
*/
|
||||
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr prepare_end_lsn,
|
||||
TimestampTz prepare_time);
|
||||
|
||||
|
||||
/*
|
||||
* Called when starting to stream a block of changes from in-progress
|
||||
* transaction (may be called repeatedly, if it's streamed in multiple
|
||||
* chunks).
|
||||
*/
|
||||
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn);
|
||||
|
||||
/*
|
||||
* Called when stopping to stream a block of changes from in-progress
|
||||
* transaction to a remote node (may be called repeatedly, if it's streamed
|
||||
* in multiple chunks).
|
||||
*/
|
||||
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn);
|
||||
|
||||
/*
|
||||
* Called to discard changes streamed to remote node from in-progress
|
||||
* transaction.
|
||||
*/
|
||||
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr abort_lsn);
|
||||
|
||||
/*
|
||||
* Called to prepare changes streamed to remote node from in-progress
|
||||
* transaction. This is called as part of a two-phase commit.
|
||||
*/
|
||||
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr prepare_lsn);
|
||||
|
||||
/*
|
||||
* Called to apply changes streamed to remote node from in-progress
|
||||
* transaction.
|
||||
*/
|
||||
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
|
||||
/*
|
||||
* Callback for streaming individual changes from in-progress transactions.
|
||||
*/
|
||||
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
Relation relation,
|
||||
ReorderBufferChange *change);
|
||||
|
||||
/*
|
||||
* Callback for streaming generic logical decoding messages from in-progress
|
||||
* transactions.
|
||||
*/
|
||||
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr message_lsn,
|
||||
bool transactional,
|
||||
const char *prefix,
|
||||
Size message_size,
|
||||
const char *message);
|
||||
|
||||
/*
|
||||
* Callback for streaming truncates from in-progress transactions.
|
||||
*/
|
||||
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
int nrelations,
|
||||
Relation relations[],
|
||||
ReorderBufferChange *change);
|
||||
|
||||
/*
|
||||
* Output plugin callbacks
|
||||
*/
|
||||
typedef struct OutputPluginCallbacks
|
||||
{
|
||||
LogicalDecodeStartupCB startup_cb;
|
||||
LogicalDecodeBeginCB begin_cb;
|
||||
LogicalDecodeChangeCB change_cb;
|
||||
LogicalDecodeTruncateCB truncate_cb;
|
||||
LogicalDecodeCommitCB commit_cb;
|
||||
LogicalDecodeMessageCB message_cb;
|
||||
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
|
||||
LogicalDecodeShutdownCB shutdown_cb;
|
||||
|
||||
/* streaming of changes at prepare time */
|
||||
LogicalDecodeFilterPrepareCB filter_prepare_cb;
|
||||
LogicalDecodeBeginPrepareCB begin_prepare_cb;
|
||||
LogicalDecodePrepareCB prepare_cb;
|
||||
LogicalDecodeCommitPreparedCB commit_prepared_cb;
|
||||
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
|
||||
|
||||
/* streaming of changes */
|
||||
LogicalDecodeStreamStartCB stream_start_cb;
|
||||
LogicalDecodeStreamStopCB stream_stop_cb;
|
||||
LogicalDecodeStreamAbortCB stream_abort_cb;
|
||||
LogicalDecodeStreamPrepareCB stream_prepare_cb;
|
||||
LogicalDecodeStreamCommitCB stream_commit_cb;
|
||||
LogicalDecodeStreamChangeCB stream_change_cb;
|
||||
LogicalDecodeStreamMessageCB stream_message_cb;
|
||||
LogicalDecodeStreamTruncateCB stream_truncate_cb;
|
||||
} OutputPluginCallbacks;
|
||||
|
||||
/* Functions in replication/logical/logical.c */
|
||||
extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
|
||||
extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
|
||||
extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
|
||||
|
||||
#endif /* OUTPUT_PLUGIN_H */
|
||||
32
db_include/replication/pgoutput.h
Executable file
32
db_include/replication/pgoutput.h
Executable file
@@ -0,0 +1,32 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* pgoutput.h
|
||||
* Logical Replication output plugin
|
||||
*
|
||||
* Copyright (c) 2015-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* pgoutput.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef PGOUTPUT_H
|
||||
#define PGOUTPUT_H
|
||||
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
typedef struct PGOutputData
|
||||
{
|
||||
MemoryContext context; /* private memory context for transient
|
||||
* allocations */
|
||||
|
||||
/* client-supplied info: */
|
||||
uint32 protocol_version;
|
||||
List *publication_names;
|
||||
List *publications;
|
||||
bool binary;
|
||||
bool streaming;
|
||||
bool messages;
|
||||
} PGOutputData;
|
||||
|
||||
#endif /* PGOUTPUT_H */
|
||||
681
db_include/replication/reorderbuffer.h
Executable file
681
db_include/replication/reorderbuffer.h
Executable file
@@ -0,0 +1,681 @@
|
||||
/*
|
||||
* reorderbuffer.h
|
||||
* PostgreSQL logical replay/reorder buffer management.
|
||||
*
|
||||
* Copyright (c) 2012-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/reorderbuffer.h
|
||||
*/
|
||||
#ifndef REORDERBUFFER_H
|
||||
#define REORDERBUFFER_H
|
||||
|
||||
#include "access/htup_details.h"
|
||||
#include "lib/ilist.h"
|
||||
#include "storage/sinval.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/relcache.h"
|
||||
#include "utils/snapshot.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
extern PGDLLIMPORT int logical_decoding_work_mem;
|
||||
|
||||
/* an individual tuple, stored in one chunk of memory */
|
||||
typedef struct ReorderBufferTupleBuf
|
||||
{
|
||||
/* position in preallocated list */
|
||||
slist_node node;
|
||||
|
||||
/* tuple header, the interesting bit for users of logical decoding */
|
||||
HeapTupleData tuple;
|
||||
|
||||
/* pre-allocated size of tuple buffer, different from tuple size */
|
||||
Size alloc_tuple_size;
|
||||
|
||||
/* actual tuple data follows */
|
||||
} ReorderBufferTupleBuf;
|
||||
|
||||
/* pointer to the data stored in a TupleBuf */
|
||||
#define ReorderBufferTupleBufData(p) \
|
||||
((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
|
||||
|
||||
/*
|
||||
* Types of the change passed to a 'change' callback.
|
||||
*
|
||||
* For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
|
||||
* and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
|
||||
* changes. Users of the decoding facilities will never see changes with
|
||||
* *_INTERNAL_* actions.
|
||||
*
|
||||
* The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT
|
||||
* changes concern "speculative insertions", their confirmation, and abort
|
||||
* respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of
|
||||
* logical decoding don't have to care about these.
|
||||
*/
|
||||
enum ReorderBufferChangeType
|
||||
{
|
||||
REORDER_BUFFER_CHANGE_INSERT,
|
||||
REORDER_BUFFER_CHANGE_UPDATE,
|
||||
REORDER_BUFFER_CHANGE_DELETE,
|
||||
REORDER_BUFFER_CHANGE_MESSAGE,
|
||||
REORDER_BUFFER_CHANGE_INVALIDATION,
|
||||
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
|
||||
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
|
||||
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
|
||||
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
|
||||
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
|
||||
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
|
||||
REORDER_BUFFER_CHANGE_TRUNCATE
|
||||
};
|
||||
|
||||
/* forward declaration */
|
||||
struct ReorderBufferTXN;
|
||||
|
||||
/*
|
||||
* a single 'change', can be an insert (with one tuple), an update (old, new),
|
||||
* or a delete (old).
|
||||
*
|
||||
* The same struct is also used internally for other purposes but that should
|
||||
* never be visible outside reorderbuffer.c.
|
||||
*/
|
||||
typedef struct ReorderBufferChange
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
|
||||
/* The type of change. */
|
||||
enum ReorderBufferChangeType action;
|
||||
|
||||
/* Transaction this change belongs to. */
|
||||
struct ReorderBufferTXN *txn;
|
||||
|
||||
RepOriginId origin_id;
|
||||
|
||||
/*
|
||||
* Context data for the change. Which part of the union is valid depends
|
||||
* on action.
|
||||
*/
|
||||
union
|
||||
{
|
||||
/* Old, new tuples when action == *_INSERT|UPDATE|DELETE */
|
||||
struct
|
||||
{
|
||||
/* relation that has been changed */
|
||||
RelFileNode relnode;
|
||||
|
||||
/* no previously reassembled toast chunks are necessary anymore */
|
||||
bool clear_toast_afterwards;
|
||||
|
||||
/* valid for DELETE || UPDATE */
|
||||
ReorderBufferTupleBuf *oldtuple;
|
||||
/* valid for INSERT || UPDATE */
|
||||
ReorderBufferTupleBuf *newtuple;
|
||||
} tp;
|
||||
|
||||
/*
|
||||
* Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
|
||||
* set of relations to be truncated.
|
||||
*/
|
||||
struct
|
||||
{
|
||||
Size nrelids;
|
||||
bool cascade;
|
||||
bool restart_seqs;
|
||||
Oid *relids;
|
||||
} truncate;
|
||||
|
||||
/* Message with arbitrary data. */
|
||||
struct
|
||||
{
|
||||
char *prefix;
|
||||
Size message_size;
|
||||
char *message;
|
||||
} msg;
|
||||
|
||||
/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
|
||||
Snapshot snapshot;
|
||||
|
||||
/*
|
||||
* New command id for existing snapshot in a catalog changing tx. Set
|
||||
* when action == *_INTERNAL_COMMAND_ID.
|
||||
*/
|
||||
CommandId command_id;
|
||||
|
||||
/*
|
||||
* New cid mapping for catalog changing transaction, set when action
|
||||
* == *_INTERNAL_TUPLECID.
|
||||
*/
|
||||
struct
|
||||
{
|
||||
RelFileNode node;
|
||||
ItemPointerData tid;
|
||||
CommandId cmin;
|
||||
CommandId cmax;
|
||||
CommandId combocid;
|
||||
} tuplecid;
|
||||
|
||||
/* Invalidation. */
|
||||
struct
|
||||
{
|
||||
uint32 ninvalidations; /* Number of messages */
|
||||
SharedInvalidationMessage *invalidations; /* invalidation message */
|
||||
} inval;
|
||||
} data;
|
||||
|
||||
/*
|
||||
* While in use this is how a change is linked into a transactions,
|
||||
* otherwise it's the preallocated list.
|
||||
*/
|
||||
dlist_node node;
|
||||
} ReorderBufferChange;
|
||||
|
||||
/* ReorderBufferTXN txn_flags */
|
||||
#define RBTXN_HAS_CATALOG_CHANGES 0x0001
|
||||
#define RBTXN_IS_SUBXACT 0x0002
|
||||
#define RBTXN_IS_SERIALIZED 0x0004
|
||||
#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
|
||||
#define RBTXN_IS_STREAMED 0x0010
|
||||
#define RBTXN_HAS_PARTIAL_CHANGE 0x0020
|
||||
#define RBTXN_PREPARE 0x0040
|
||||
#define RBTXN_SKIPPED_PREPARE 0x0080
|
||||
|
||||
/* Does the transaction have catalog changes? */
|
||||
#define rbtxn_has_catalog_changes(txn) \
|
||||
( \
|
||||
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
|
||||
)
|
||||
|
||||
/* Is the transaction known as a subxact? */
|
||||
#define rbtxn_is_known_subxact(txn) \
|
||||
( \
|
||||
((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
|
||||
)
|
||||
|
||||
/* Has this transaction been spilled to disk? */
|
||||
#define rbtxn_is_serialized(txn) \
|
||||
( \
|
||||
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
|
||||
)
|
||||
|
||||
/* Has this transaction ever been spilled to disk? */
|
||||
#define rbtxn_is_serialized_clear(txn) \
|
||||
( \
|
||||
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
|
||||
)
|
||||
|
||||
/* Has this transaction contains partial changes? */
|
||||
#define rbtxn_has_partial_change(txn) \
|
||||
( \
|
||||
((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
|
||||
)
|
||||
|
||||
/*
|
||||
* Has this transaction been streamed to downstream?
|
||||
*
|
||||
* (It's not possible to deduce this from nentries and nentries_mem for
|
||||
* various reasons. For example, all changes may be in subtransactions in
|
||||
* which case we'd have nentries==0 for the toplevel one, which would say
|
||||
* nothing about the streaming. So we maintain this flag, but only for the
|
||||
* toplevel transaction.)
|
||||
*/
|
||||
#define rbtxn_is_streamed(txn) \
|
||||
( \
|
||||
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
|
||||
)
|
||||
|
||||
/* Has this transaction been prepared? */
|
||||
#define rbtxn_prepared(txn) \
|
||||
( \
|
||||
((txn)->txn_flags & RBTXN_PREPARE) != 0 \
|
||||
)
|
||||
|
||||
/* prepare for this transaction skipped? */
|
||||
#define rbtxn_skip_prepared(txn) \
|
||||
( \
|
||||
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
|
||||
)
|
||||
|
||||
typedef struct ReorderBufferTXN
|
||||
{
|
||||
/* See above */
|
||||
bits32 txn_flags;
|
||||
|
||||
/* The transaction's transaction id, can be a toplevel or sub xid. */
|
||||
TransactionId xid;
|
||||
|
||||
/* Xid of top-level transaction, if known */
|
||||
TransactionId toplevel_xid;
|
||||
|
||||
/*
|
||||
* Global transaction id required for identification of prepared
|
||||
* transactions.
|
||||
*/
|
||||
char *gid;
|
||||
|
||||
/*
|
||||
* LSN of the first data carrying, WAL record with knowledge about this
|
||||
* xid. This is allowed to *not* be first record adorned with this xid, if
|
||||
* the previous records aren't relevant for logical decoding.
|
||||
*/
|
||||
XLogRecPtr first_lsn;
|
||||
|
||||
/* ----
|
||||
* LSN of the record that lead to this xact to be prepared or committed or
|
||||
* aborted. This can be a
|
||||
* * plain commit record
|
||||
* * plain commit record, of a parent transaction
|
||||
* * prepared tansaction
|
||||
* * prepared transaction commit
|
||||
* * plain abort record
|
||||
* * prepared transaction abort
|
||||
*
|
||||
* This can also become set to earlier values than transaction end when
|
||||
* a transaction is spilled to disk; specifically it's set to the LSN of
|
||||
* the latest change written to disk so far.
|
||||
* ----
|
||||
*/
|
||||
XLogRecPtr final_lsn;
|
||||
|
||||
/*
|
||||
* LSN pointing to the end of the commit record + 1.
|
||||
*/
|
||||
XLogRecPtr end_lsn;
|
||||
|
||||
/* Toplevel transaction for this subxact (NULL for top-level). */
|
||||
struct ReorderBufferTXN *toptxn;
|
||||
|
||||
/*
|
||||
* LSN of the last lsn at which snapshot information reside, so we can
|
||||
* restart decoding from there and fully recover this transaction from
|
||||
* WAL.
|
||||
*/
|
||||
XLogRecPtr restart_decoding_lsn;
|
||||
|
||||
/* origin of the change that caused this transaction */
|
||||
RepOriginId origin_id;
|
||||
XLogRecPtr origin_lsn;
|
||||
|
||||
/*
|
||||
* Commit or Prepare time, only known when we read the actual commit or
|
||||
* prepare record.
|
||||
*/
|
||||
TimestampTz commit_time;
|
||||
|
||||
/*
|
||||
* The base snapshot is used to decode all changes until either this
|
||||
* transaction modifies the catalog, or another catalog-modifying
|
||||
* transaction commits.
|
||||
*/
|
||||
Snapshot base_snapshot;
|
||||
XLogRecPtr base_snapshot_lsn;
|
||||
dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
|
||||
|
||||
/*
|
||||
* Snapshot/CID from the previous streaming run. Only valid for already
|
||||
* streamed transactions (NULL/InvalidCommandId otherwise).
|
||||
*/
|
||||
Snapshot snapshot_now;
|
||||
CommandId command_id;
|
||||
|
||||
/*
|
||||
* How many ReorderBufferChange's do we have in this txn.
|
||||
*
|
||||
* Changes in subtransactions are *not* included but tracked separately.
|
||||
*/
|
||||
uint64 nentries;
|
||||
|
||||
/*
|
||||
* How many of the above entries are stored in memory in contrast to being
|
||||
* spilled to disk.
|
||||
*/
|
||||
uint64 nentries_mem;
|
||||
|
||||
/*
|
||||
* List of ReorderBufferChange structs, including new Snapshots, new
|
||||
* CommandIds and command invalidation messages.
|
||||
*/
|
||||
dlist_head changes;
|
||||
|
||||
/*
|
||||
* List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
|
||||
* Those are always assigned to the toplevel transaction. (Keep track of
|
||||
* #entries to create a hash of the right size)
|
||||
*/
|
||||
dlist_head tuplecids;
|
||||
uint64 ntuplecids;
|
||||
|
||||
/*
|
||||
* On-demand built hash for looking up the above values.
|
||||
*/
|
||||
HTAB *tuplecid_hash;
|
||||
|
||||
/*
|
||||
* Hash containing (potentially partial) toast entries. NULL if no toast
|
||||
* tuples have been found for the current change.
|
||||
*/
|
||||
HTAB *toast_hash;
|
||||
|
||||
/*
|
||||
* non-hierarchical list of subtransactions that are *not* aborted. Only
|
||||
* used in toplevel transactions.
|
||||
*/
|
||||
dlist_head subtxns;
|
||||
uint32 nsubtxns;
|
||||
|
||||
/*
|
||||
* Stored cache invalidations. This is not a linked list because we get
|
||||
* all the invalidations at once.
|
||||
*/
|
||||
uint32 ninvalidations;
|
||||
SharedInvalidationMessage *invalidations;
|
||||
|
||||
/* ---
|
||||
* Position in one of three lists:
|
||||
* * list of subtransactions if we are *known* to be subxact
|
||||
* * list of toplevel xacts (can be an as-yet unknown subxact)
|
||||
* * list of preallocated ReorderBufferTXNs (if unused)
|
||||
* ---
|
||||
*/
|
||||
dlist_node node;
|
||||
|
||||
/*
|
||||
* Size of this transaction (changes currently in memory, in bytes).
|
||||
*/
|
||||
Size size;
|
||||
|
||||
/* Size of top-transaction including sub-transactions. */
|
||||
Size total_size;
|
||||
|
||||
/* If we have detected concurrent abort then ignore future changes. */
|
||||
bool concurrent_abort;
|
||||
|
||||
/*
|
||||
* Private data pointer of the output plugin.
|
||||
*/
|
||||
void *output_plugin_private;
|
||||
} ReorderBufferTXN;
|
||||
|
||||
/* so we can define the callbacks used inside struct ReorderBuffer itself */
|
||||
typedef struct ReorderBuffer ReorderBuffer;
|
||||
|
||||
/* change callback signature */
|
||||
typedef void (*ReorderBufferApplyChangeCB) (ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
Relation relation,
|
||||
ReorderBufferChange *change);
|
||||
|
||||
/* truncate callback signature */
|
||||
typedef void (*ReorderBufferApplyTruncateCB) (ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
int nrelations,
|
||||
Relation relations[],
|
||||
ReorderBufferChange *change);
|
||||
|
||||
/* begin callback signature */
|
||||
typedef void (*ReorderBufferBeginCB) (ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn);
|
||||
|
||||
/* commit callback signature */
|
||||
typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
|
||||
/* message callback signature */
|
||||
typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr message_lsn,
|
||||
bool transactional,
|
||||
const char *prefix, Size sz,
|
||||
const char *message);
|
||||
|
||||
/* begin prepare callback signature */
|
||||
typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn);
|
||||
|
||||
/* prepare callback signature */
|
||||
typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr prepare_lsn);
|
||||
|
||||
/* commit prepared callback signature */
|
||||
typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
|
||||
/* rollback prepared callback signature */
|
||||
typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr prepare_end_lsn,
|
||||
TimestampTz prepare_time);
|
||||
|
||||
/* start streaming transaction callback signature */
|
||||
typedef void (*ReorderBufferStreamStartCB) (
|
||||
ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr first_lsn);
|
||||
|
||||
/* stop streaming transaction callback signature */
|
||||
typedef void (*ReorderBufferStreamStopCB) (
|
||||
ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr last_lsn);
|
||||
|
||||
/* discard streamed transaction callback signature */
|
||||
typedef void (*ReorderBufferStreamAbortCB) (
|
||||
ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr abort_lsn);
|
||||
|
||||
/* prepare streamed transaction callback signature */
|
||||
typedef void (*ReorderBufferStreamPrepareCB) (
|
||||
ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr prepare_lsn);
|
||||
|
||||
/* commit streamed transaction callback signature */
|
||||
typedef void (*ReorderBufferStreamCommitCB) (
|
||||
ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
|
||||
/* stream change callback signature */
|
||||
typedef void (*ReorderBufferStreamChangeCB) (
|
||||
ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
Relation relation,
|
||||
ReorderBufferChange *change);
|
||||
|
||||
/* stream message callback signature */
|
||||
typedef void (*ReorderBufferStreamMessageCB) (
|
||||
ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
XLogRecPtr message_lsn,
|
||||
bool transactional,
|
||||
const char *prefix, Size sz,
|
||||
const char *message);
|
||||
|
||||
/* stream truncate callback signature */
|
||||
typedef void (*ReorderBufferStreamTruncateCB) (
|
||||
ReorderBuffer *rb,
|
||||
ReorderBufferTXN *txn,
|
||||
int nrelations,
|
||||
Relation relations[],
|
||||
ReorderBufferChange *change);
|
||||
|
||||
struct ReorderBuffer
|
||||
{
|
||||
/*
|
||||
* xid => ReorderBufferTXN lookup table
|
||||
*/
|
||||
HTAB *by_txn;
|
||||
|
||||
/*
|
||||
* Transactions that could be a toplevel xact, ordered by LSN of the first
|
||||
* record bearing that xid.
|
||||
*/
|
||||
dlist_head toplevel_by_lsn;
|
||||
|
||||
/*
|
||||
* Transactions and subtransactions that have a base snapshot, ordered by
|
||||
* LSN of the record which caused us to first obtain the base snapshot.
|
||||
* This is not the same as toplevel_by_lsn, because we only set the base
|
||||
* snapshot on the first logical-decoding-relevant record (eg. heap
|
||||
* writes), whereas the initial LSN could be set by other operations.
|
||||
*/
|
||||
dlist_head txns_by_base_snapshot_lsn;
|
||||
|
||||
/*
|
||||
* one-entry sized cache for by_txn. Very frequently the same txn gets
|
||||
* looked up over and over again.
|
||||
*/
|
||||
TransactionId by_txn_last_xid;
|
||||
ReorderBufferTXN *by_txn_last_txn;
|
||||
|
||||
/*
|
||||
* Callbacks to be called when a transactions commits.
|
||||
*/
|
||||
ReorderBufferBeginCB begin;
|
||||
ReorderBufferApplyChangeCB apply_change;
|
||||
ReorderBufferApplyTruncateCB apply_truncate;
|
||||
ReorderBufferCommitCB commit;
|
||||
ReorderBufferMessageCB message;
|
||||
|
||||
/*
|
||||
* Callbacks to be called when streaming a transaction at prepare time.
|
||||
*/
|
||||
ReorderBufferBeginCB begin_prepare;
|
||||
ReorderBufferPrepareCB prepare;
|
||||
ReorderBufferCommitPreparedCB commit_prepared;
|
||||
ReorderBufferRollbackPreparedCB rollback_prepared;
|
||||
|
||||
/*
|
||||
* Callbacks to be called when streaming a transaction.
|
||||
*/
|
||||
ReorderBufferStreamStartCB stream_start;
|
||||
ReorderBufferStreamStopCB stream_stop;
|
||||
ReorderBufferStreamAbortCB stream_abort;
|
||||
ReorderBufferStreamPrepareCB stream_prepare;
|
||||
ReorderBufferStreamCommitCB stream_commit;
|
||||
ReorderBufferStreamChangeCB stream_change;
|
||||
ReorderBufferStreamMessageCB stream_message;
|
||||
ReorderBufferStreamTruncateCB stream_truncate;
|
||||
|
||||
/*
|
||||
* Pointer that will be passed untouched to the callbacks.
|
||||
*/
|
||||
void *private_data;
|
||||
|
||||
/*
|
||||
* Saved output plugin option
|
||||
*/
|
||||
bool output_rewrites;
|
||||
|
||||
/*
|
||||
* Private memory context.
|
||||
*/
|
||||
MemoryContext context;
|
||||
|
||||
/*
|
||||
* Memory contexts for specific types objects
|
||||
*/
|
||||
MemoryContext change_context;
|
||||
MemoryContext txn_context;
|
||||
MemoryContext tup_context;
|
||||
|
||||
XLogRecPtr current_restart_decoding_lsn;
|
||||
|
||||
/* buffer for disk<->memory conversions */
|
||||
char *outbuf;
|
||||
Size outbufsize;
|
||||
|
||||
/* memory accounting */
|
||||
Size size;
|
||||
|
||||
/*
|
||||
* Statistics about transactions spilled to disk.
|
||||
*
|
||||
* A single transaction may be spilled repeatedly, which is why we keep
|
||||
* two different counters. For spilling, the transaction counter includes
|
||||
* both toplevel transactions and subtransactions.
|
||||
*/
|
||||
int64 spillTxns; /* number of transactions spilled to disk */
|
||||
int64 spillCount; /* spill-to-disk invocation counter */
|
||||
int64 spillBytes; /* amount of data spilled to disk */
|
||||
|
||||
/* Statistics about transactions streamed to the decoding output plugin */
|
||||
int64 streamTxns; /* number of transactions streamed */
|
||||
int64 streamCount; /* streaming invocation counter */
|
||||
int64 streamBytes; /* amount of data decoded */
|
||||
|
||||
/*
|
||||
* Statistics about all the transactions sent to the decoding output
|
||||
* plugin
|
||||
*/
|
||||
int64 totalTxns; /* total number of transactions sent */
|
||||
int64 totalBytes; /* total amount of data decoded */
|
||||
};
|
||||
|
||||
|
||||
ReorderBuffer *ReorderBufferAllocate(void);
|
||||
void ReorderBufferFree(ReorderBuffer *);
|
||||
|
||||
ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
|
||||
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
|
||||
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
|
||||
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool);
|
||||
|
||||
Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
|
||||
void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids);
|
||||
|
||||
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
|
||||
XLogRecPtr lsn, ReorderBufferChange *,
|
||||
bool toast_insert);
|
||||
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
|
||||
bool transactional, const char *prefix,
|
||||
Size message_size, const char *message);
|
||||
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
|
||||
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
|
||||
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
|
||||
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
|
||||
XLogRecPtr initial_consistent_point,
|
||||
TimestampTz commit_time,
|
||||
RepOriginId origin_id, XLogRecPtr origin_lsn,
|
||||
char *gid, bool is_commit);
|
||||
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
|
||||
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
|
||||
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
|
||||
void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
|
||||
void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid);
|
||||
void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
|
||||
void ReorderBufferInvalidate(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
|
||||
|
||||
void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
|
||||
void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
|
||||
void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
|
||||
CommandId cid);
|
||||
void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
|
||||
RelFileNode node, ItemPointerData pt,
|
||||
CommandId cmin, CommandId cmax, CommandId combocid);
|
||||
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
|
||||
Size nmsgs, SharedInvalidationMessage *msgs);
|
||||
void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
|
||||
SharedInvalidationMessage *invalidations);
|
||||
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
|
||||
|
||||
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
|
||||
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
|
||||
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
|
||||
|
||||
bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
|
||||
TimestampTz prepare_time,
|
||||
RepOriginId origin_id, XLogRecPtr origin_lsn);
|
||||
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
|
||||
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
|
||||
ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
|
||||
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
|
||||
|
||||
void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
|
||||
|
||||
void StartupReorderBuffer(void);
|
||||
|
||||
#endif
|
||||
227
db_include/replication/slot.h
Executable file
227
db_include/replication/slot.h
Executable file
@@ -0,0 +1,227 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
* slot.h
|
||||
* Replication slot management.
|
||||
*
|
||||
* Copyright (c) 2012-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef SLOT_H
|
||||
#define SLOT_H
|
||||
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogreader.h"
|
||||
#include "storage/condition_variable.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/spin.h"
|
||||
#include "replication/walreceiver.h"
|
||||
|
||||
/*
|
||||
* Behaviour of replication slots, upon release or crash.
|
||||
*
|
||||
* Slots marked as PERSISTENT are crash-safe and will not be dropped when
|
||||
* released. Slots marked as EPHEMERAL will be dropped when released or after
|
||||
* restarts. Slots marked TEMPORARY will be dropped at the end of a session
|
||||
* or on error.
|
||||
*
|
||||
* EPHEMERAL is used as a not-quite-ready state when creating persistent
|
||||
* slots. EPHEMERAL slots can be made PERSISTENT by calling
|
||||
* ReplicationSlotPersist(). For a slot that goes away at the end of a
|
||||
* session, TEMPORARY is the appropriate choice.
|
||||
*/
|
||||
typedef enum ReplicationSlotPersistency
|
||||
{
|
||||
RS_PERSISTENT,
|
||||
RS_EPHEMERAL,
|
||||
RS_TEMPORARY
|
||||
} ReplicationSlotPersistency;
|
||||
|
||||
/*
|
||||
* On-Disk data of a replication slot, preserved across restarts.
|
||||
*/
|
||||
typedef struct ReplicationSlotPersistentData
|
||||
{
|
||||
/* The slot's identifier */
|
||||
NameData name;
|
||||
|
||||
/* database the slot is active on */
|
||||
Oid database;
|
||||
|
||||
/*
|
||||
* The slot's behaviour when being dropped (or restored after a crash).
|
||||
*/
|
||||
ReplicationSlotPersistency persistency;
|
||||
|
||||
/*
|
||||
* xmin horizon for data
|
||||
*
|
||||
* NB: This may represent a value that hasn't been written to disk yet;
|
||||
* see notes for effective_xmin, below.
|
||||
*/
|
||||
TransactionId xmin;
|
||||
|
||||
/*
|
||||
* xmin horizon for catalog tuples
|
||||
*
|
||||
* NB: This may represent a value that hasn't been written to disk yet;
|
||||
* see notes for effective_xmin, below.
|
||||
*/
|
||||
TransactionId catalog_xmin;
|
||||
|
||||
/* oldest LSN that might be required by this replication slot */
|
||||
XLogRecPtr restart_lsn;
|
||||
|
||||
/* restart_lsn is copied here when the slot is invalidated */
|
||||
XLogRecPtr invalidated_at;
|
||||
|
||||
/*
|
||||
* Oldest LSN that the client has acked receipt for. This is used as the
|
||||
* start_lsn point in case the client doesn't specify one, and also as a
|
||||
* safety measure to jump forwards in case the client specifies a
|
||||
* start_lsn that's further in the past than this value.
|
||||
*/
|
||||
XLogRecPtr confirmed_flush;
|
||||
|
||||
/*
|
||||
* LSN at which we found a consistent point at the time of slot creation.
|
||||
* This is also the point where we have exported a snapshot for the
|
||||
* initial copy.
|
||||
*/
|
||||
XLogRecPtr initial_consistent_point;
|
||||
|
||||
/*
|
||||
* Allow decoding of prepared transactions?
|
||||
*/
|
||||
bool two_phase;
|
||||
|
||||
/* plugin name */
|
||||
NameData plugin;
|
||||
} ReplicationSlotPersistentData;
|
||||
|
||||
/*
|
||||
* Shared memory state of a single replication slot.
|
||||
*
|
||||
* The in-memory data of replication slots follows a locking model based
|
||||
* on two linked concepts:
|
||||
* - A replication slot's in_use flag is switched when added or discarded using
|
||||
* the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
|
||||
* mode when updating the flag by the backend owning the slot and doing the
|
||||
* operation, while readers (concurrent backends not owning the slot) need
|
||||
* to hold it in shared mode when looking at replication slot data.
|
||||
* - Individual fields are protected by mutex where only the backend owning
|
||||
* the slot is authorized to update the fields from its own slot. The
|
||||
* backend owning the slot does not need to take this lock when reading its
|
||||
* own fields, while concurrent backends not owning this slot should take the
|
||||
* lock when reading this slot's data.
|
||||
*/
|
||||
typedef struct ReplicationSlot
|
||||
{
|
||||
/* lock, on same cacheline as effective_xmin */
|
||||
slock_t mutex;
|
||||
|
||||
/* is this slot defined */
|
||||
bool in_use;
|
||||
|
||||
/* Who is streaming out changes for this slot? 0 in unused slots. */
|
||||
pid_t active_pid;
|
||||
|
||||
/* any outstanding modifications? */
|
||||
bool just_dirtied;
|
||||
bool dirty;
|
||||
|
||||
/*
|
||||
* For logical decoding, it's extremely important that we never remove any
|
||||
* data that's still needed for decoding purposes, even after a crash;
|
||||
* otherwise, decoding will produce wrong answers. Ordinary streaming
|
||||
* replication also needs to prevent old row versions from being removed
|
||||
* too soon, but the worst consequence we might encounter there is
|
||||
* unwanted query cancellations on the standby. Thus, for logical
|
||||
* decoding, this value represents the latest xmin that has actually been
|
||||
* written to disk, whereas for streaming replication, it's just the same
|
||||
* as the persistent value (data.xmin).
|
||||
*/
|
||||
TransactionId effective_xmin;
|
||||
TransactionId effective_catalog_xmin;
|
||||
|
||||
/* data surviving shutdowns and crashes */
|
||||
ReplicationSlotPersistentData data;
|
||||
|
||||
/* is somebody performing io on this slot? */
|
||||
LWLock io_in_progress_lock;
|
||||
|
||||
/* Condition variable signaled when active_pid changes */
|
||||
ConditionVariable active_cv;
|
||||
|
||||
/* all the remaining data is only used for logical slots */
|
||||
|
||||
/*
|
||||
* When the client has confirmed flushes >= candidate_xmin_lsn we can
|
||||
* advance the catalog xmin. When restart_valid has been passed,
|
||||
* restart_lsn can be increased.
|
||||
*/
|
||||
TransactionId candidate_catalog_xmin;
|
||||
XLogRecPtr candidate_xmin_lsn;
|
||||
XLogRecPtr candidate_restart_valid;
|
||||
XLogRecPtr candidate_restart_lsn;
|
||||
} ReplicationSlot;
|
||||
|
||||
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
|
||||
#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
|
||||
|
||||
/*
|
||||
* Shared memory control area for all of replication slots.
|
||||
*/
|
||||
typedef struct ReplicationSlotCtlData
|
||||
{
|
||||
/*
|
||||
* This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
|
||||
* reason you can't do that in an otherwise-empty struct.
|
||||
*/
|
||||
ReplicationSlot replication_slots[1];
|
||||
} ReplicationSlotCtlData;
|
||||
|
||||
/*
|
||||
* Pointers to shared memory
|
||||
*/
|
||||
extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl;
|
||||
extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
|
||||
|
||||
/* GUCs */
|
||||
extern PGDLLIMPORT int max_replication_slots;
|
||||
|
||||
/* shmem initialization functions */
|
||||
extern Size ReplicationSlotsShmemSize(void);
|
||||
extern void ReplicationSlotsShmemInit(void);
|
||||
|
||||
/* management of individual slots */
|
||||
extern void ReplicationSlotCreate(const char *name, bool db_specific,
|
||||
ReplicationSlotPersistency p, bool two_phase);
|
||||
extern void ReplicationSlotPersist(void);
|
||||
extern void ReplicationSlotDrop(const char *name, bool nowait);
|
||||
|
||||
extern void ReplicationSlotAcquire(const char *name, bool nowait);
|
||||
extern void ReplicationSlotRelease(void);
|
||||
extern void ReplicationSlotCleanup(void);
|
||||
extern void ReplicationSlotSave(void);
|
||||
extern void ReplicationSlotMarkDirty(void);
|
||||
|
||||
/* misc stuff */
|
||||
extern bool ReplicationSlotValidateName(const char *name, int elevel);
|
||||
extern void ReplicationSlotReserveWal(void);
|
||||
extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
|
||||
extern void ReplicationSlotsComputeRequiredLSN(void);
|
||||
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
|
||||
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
|
||||
extern void ReplicationSlotsDropDBSlots(Oid dboid);
|
||||
extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
|
||||
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
|
||||
extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
|
||||
extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
|
||||
|
||||
extern void StartupReplicationSlots(void);
|
||||
extern void CheckPointReplicationSlots(void);
|
||||
|
||||
extern void CheckSlotRequirements(void);
|
||||
|
||||
#endif /* SLOT_H */
|
||||
93
db_include/replication/snapbuild.h
Executable file
93
db_include/replication/snapbuild.h
Executable file
@@ -0,0 +1,93 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* snapbuild.h
|
||||
* Exports from replication/logical/snapbuild.c.
|
||||
*
|
||||
* Copyright (c) 2012-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/snapbuild.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef SNAPBUILD_H
|
||||
#define SNAPBUILD_H
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
#include "utils/snapmgr.h"
|
||||
|
||||
typedef enum
|
||||
{
|
||||
/*
|
||||
* Initial state, we can't do much yet.
|
||||
*/
|
||||
SNAPBUILD_START = -1,
|
||||
|
||||
/*
|
||||
* Collecting committed transactions, to build the initial catalog
|
||||
* snapshot.
|
||||
*/
|
||||
SNAPBUILD_BUILDING_SNAPSHOT = 0,
|
||||
|
||||
/*
|
||||
* We have collected enough information to decode tuples in transactions
|
||||
* that started after this.
|
||||
*
|
||||
* Once we reached this we start to collect changes. We cannot apply them
|
||||
* yet, because they might be based on transactions that were still
|
||||
* running when FULL_SNAPSHOT was reached.
|
||||
*/
|
||||
SNAPBUILD_FULL_SNAPSHOT = 1,
|
||||
|
||||
/*
|
||||
* Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that
|
||||
* were running at that point finished. Till we reach that we hold off
|
||||
* calling any commit callbacks.
|
||||
*/
|
||||
SNAPBUILD_CONSISTENT = 2
|
||||
} SnapBuildState;
|
||||
|
||||
/* forward declare so we don't have to expose the struct to the public */
|
||||
struct SnapBuild;
|
||||
typedef struct SnapBuild SnapBuild;
|
||||
|
||||
/* forward declare so we don't have to include reorderbuffer.h */
|
||||
struct ReorderBuffer;
|
||||
|
||||
/* forward declare so we don't have to include heapam_xlog.h */
|
||||
struct xl_heap_new_cid;
|
||||
struct xl_running_xacts;
|
||||
|
||||
extern void CheckPointSnapBuild(void);
|
||||
|
||||
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
|
||||
TransactionId xmin_horizon, XLogRecPtr start_lsn,
|
||||
bool need_full_snapshot,
|
||||
XLogRecPtr initial_consistent_point);
|
||||
extern void FreeSnapshotBuilder(SnapBuild *cache);
|
||||
|
||||
extern void SnapBuildSnapDecRefcount(Snapshot snap);
|
||||
|
||||
extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder);
|
||||
extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
|
||||
extern void SnapBuildClearExportedSnapshot(void);
|
||||
extern void SnapBuildResetExportedSnapshotState(void);
|
||||
|
||||
extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
|
||||
extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
|
||||
TransactionId xid);
|
||||
|
||||
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
|
||||
extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
|
||||
|
||||
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
|
||||
TransactionId xid, int nsubxacts,
|
||||
TransactionId *subxacts);
|
||||
extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
|
||||
XLogRecPtr lsn);
|
||||
extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
|
||||
XLogRecPtr lsn, struct xl_heap_new_cid *cid);
|
||||
extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
|
||||
struct xl_running_xacts *running);
|
||||
extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
|
||||
|
||||
#endif /* SNAPBUILD_H */
|
||||
115
db_include/replication/syncrep.h
Executable file
115
db_include/replication/syncrep.h
Executable file
@@ -0,0 +1,115 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* syncrep.h
|
||||
* Exports from replication/syncrep.c.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/include/replication/syncrep.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef _SYNCREP_H
|
||||
#define _SYNCREP_H
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
#include "utils/guc.h"
|
||||
|
||||
#define SyncRepRequested() \
|
||||
(max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
|
||||
|
||||
/* SyncRepWaitMode */
|
||||
#define SYNC_REP_NO_WAIT (-1)
|
||||
#define SYNC_REP_WAIT_WRITE 0
|
||||
#define SYNC_REP_WAIT_FLUSH 1
|
||||
#define SYNC_REP_WAIT_APPLY 2
|
||||
|
||||
#define NUM_SYNC_REP_WAIT_MODE 3
|
||||
|
||||
/* syncRepState */
|
||||
#define SYNC_REP_NOT_WAITING 0
|
||||
#define SYNC_REP_WAITING 1
|
||||
#define SYNC_REP_WAIT_COMPLETE 2
|
||||
|
||||
/* syncrep_method of SyncRepConfigData */
|
||||
#define SYNC_REP_PRIORITY 0
|
||||
#define SYNC_REP_QUORUM 1
|
||||
|
||||
/*
|
||||
* SyncRepGetCandidateStandbys returns an array of these structs,
|
||||
* one per candidate synchronous walsender.
|
||||
*/
|
||||
typedef struct SyncRepStandbyData
|
||||
{
|
||||
/* Copies of relevant fields from WalSnd shared-memory struct */
|
||||
pid_t pid;
|
||||
XLogRecPtr write;
|
||||
XLogRecPtr flush;
|
||||
XLogRecPtr apply;
|
||||
int sync_standby_priority;
|
||||
/* Index of this walsender in the WalSnd shared-memory array */
|
||||
int walsnd_index;
|
||||
/* This flag indicates whether this struct is about our own process */
|
||||
bool is_me;
|
||||
} SyncRepStandbyData;
|
||||
|
||||
/*
|
||||
* Struct for the configuration of synchronous replication.
|
||||
*
|
||||
* Note: this must be a flat representation that can be held in a single
|
||||
* chunk of malloc'd memory, so that it can be stored as the "extra" data
|
||||
* for the synchronous_standby_names GUC.
|
||||
*/
|
||||
typedef struct SyncRepConfigData
|
||||
{
|
||||
int config_size; /* total size of this struct, in bytes */
|
||||
int num_sync; /* number of sync standbys that we need to
|
||||
* wait for */
|
||||
uint8 syncrep_method; /* method to choose sync standbys */
|
||||
int nmembers; /* number of members in the following list */
|
||||
/* member_names contains nmembers consecutive nul-terminated C strings */
|
||||
char member_names[FLEXIBLE_ARRAY_MEMBER];
|
||||
} SyncRepConfigData;
|
||||
|
||||
extern SyncRepConfigData *SyncRepConfig;
|
||||
|
||||
/* communication variables for parsing synchronous_standby_names GUC */
|
||||
extern SyncRepConfigData *syncrep_parse_result;
|
||||
extern char *syncrep_parse_error_msg;
|
||||
|
||||
/* user-settable parameters for synchronous replication */
|
||||
extern char *SyncRepStandbyNames;
|
||||
|
||||
/* called by user backend */
|
||||
extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
|
||||
|
||||
/* called at backend exit */
|
||||
extern void SyncRepCleanupAtProcExit(void);
|
||||
|
||||
/* called by wal sender */
|
||||
extern void SyncRepInitConfig(void);
|
||||
extern void SyncRepReleaseWaiters(void);
|
||||
|
||||
/* called by wal sender and user backend */
|
||||
extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
|
||||
|
||||
/* called by checkpointer */
|
||||
extern void SyncRepUpdateSyncStandbysDefined(void);
|
||||
|
||||
/* GUC infrastructure */
|
||||
extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
|
||||
extern void assign_synchronous_standby_names(const char *newval, void *extra);
|
||||
extern void assign_synchronous_commit(int newval, void *extra);
|
||||
|
||||
/*
|
||||
* Internal functions for parsing synchronous_standby_names grammar,
|
||||
* in syncrep_gram.y and syncrep_scanner.l
|
||||
*/
|
||||
extern int syncrep_yyparse(void);
|
||||
extern int syncrep_yylex(void);
|
||||
extern void syncrep_yyerror(const char *str);
|
||||
extern void syncrep_scanner_init(const char *query_string);
|
||||
extern void syncrep_scanner_finish(void);
|
||||
|
||||
#endif /* _SYNCREP_H */
|
||||
469
db_include/replication/walreceiver.h
Executable file
469
db_include/replication/walreceiver.h
Executable file
@@ -0,0 +1,469 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* walreceiver.h
|
||||
* Exports from replication/walreceiverfuncs.c.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/walreceiver.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef _WALRECEIVER_H
|
||||
#define _WALRECEIVER_H
|
||||
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "getaddrinfo.h" /* for NI_MAXHOST */
|
||||
#include "pgtime.h"
|
||||
#include "port/atomics.h"
|
||||
#include "replication/logicalproto.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/condition_variable.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/spin.h"
|
||||
#include "utils/tuplestore.h"
|
||||
|
||||
/* user-settable parameters */
|
||||
extern int wal_receiver_status_interval;
|
||||
extern int wal_receiver_timeout;
|
||||
extern bool hot_standby_feedback;
|
||||
|
||||
/*
|
||||
* MAXCONNINFO: maximum size of a connection string.
|
||||
*
|
||||
* XXX: Should this move to pg_config_manual.h?
|
||||
*/
|
||||
#define MAXCONNINFO 1024
|
||||
|
||||
/* Can we allow the standby to accept replication connection from another standby? */
|
||||
#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
|
||||
|
||||
/*
|
||||
* Values for WalRcv->walRcvState.
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
WALRCV_STOPPED, /* stopped and mustn't start up again */
|
||||
WALRCV_STARTING, /* launched, but the process hasn't
|
||||
* initialized yet */
|
||||
WALRCV_STREAMING, /* walreceiver is streaming */
|
||||
WALRCV_WAITING, /* stopped streaming, waiting for orders */
|
||||
WALRCV_RESTARTING, /* asked to restart streaming */
|
||||
WALRCV_STOPPING /* requested to stop, but still running */
|
||||
} WalRcvState;
|
||||
|
||||
/* Shared memory area for management of walreceiver process */
|
||||
typedef struct
|
||||
{
|
||||
/*
|
||||
* PID of currently active walreceiver process, its current state and
|
||||
* start time (actually, the time at which it was requested to be
|
||||
* started).
|
||||
*/
|
||||
pid_t pid;
|
||||
WalRcvState walRcvState;
|
||||
ConditionVariable walRcvStoppedCV;
|
||||
pg_time_t startTime;
|
||||
|
||||
/*
|
||||
* receiveStart and receiveStartTLI indicate the first byte position and
|
||||
* timeline that will be received. When startup process starts the
|
||||
* walreceiver, it sets these to the point where it wants the streaming to
|
||||
* begin.
|
||||
*/
|
||||
XLogRecPtr receiveStart;
|
||||
TimeLineID receiveStartTLI;
|
||||
|
||||
/*
|
||||
* flushedUpto-1 is the last byte position that has already been received,
|
||||
* and receivedTLI is the timeline it came from. At the first startup of
|
||||
* walreceiver, these are set to receiveStart and receiveStartTLI. After
|
||||
* that, walreceiver updates these whenever it flushes the received WAL to
|
||||
* disk.
|
||||
*/
|
||||
XLogRecPtr flushedUpto;
|
||||
TimeLineID receivedTLI;
|
||||
|
||||
/*
|
||||
* latestChunkStart is the starting byte position of the current "batch"
|
||||
* of received WAL. It's actually the same as the previous value of
|
||||
* flushedUpto before the last flush to disk. Startup process can use
|
||||
* this to detect whether it's keeping up or not.
|
||||
*/
|
||||
XLogRecPtr latestChunkStart;
|
||||
|
||||
/*
|
||||
* Time of send and receive of any message received.
|
||||
*/
|
||||
TimestampTz lastMsgSendTime;
|
||||
TimestampTz lastMsgReceiptTime;
|
||||
|
||||
/*
|
||||
* Latest reported end of WAL on the sender
|
||||
*/
|
||||
XLogRecPtr latestWalEnd;
|
||||
TimestampTz latestWalEndTime;
|
||||
|
||||
/*
|
||||
* connection string; initially set to connect to the primary, and later
|
||||
* clobbered to hide security-sensitive fields.
|
||||
*/
|
||||
char conninfo[MAXCONNINFO];
|
||||
|
||||
/*
|
||||
* Host name (this can be a host name, an IP address, or a directory path)
|
||||
* and port number of the active replication connection.
|
||||
*/
|
||||
char sender_host[NI_MAXHOST];
|
||||
int sender_port;
|
||||
|
||||
/*
|
||||
* replication slot name; is also used for walreceiver to connect with the
|
||||
* primary
|
||||
*/
|
||||
char slotname[NAMEDATALEN];
|
||||
|
||||
/*
|
||||
* If it's a temporary replication slot, it needs to be recreated when
|
||||
* connecting.
|
||||
*/
|
||||
bool is_temp_slot;
|
||||
|
||||
/* set true once conninfo is ready to display (obfuscated pwds etc) */
|
||||
bool ready_to_display;
|
||||
|
||||
/*
|
||||
* Latch used by startup process to wake up walreceiver after telling it
|
||||
* where to start streaming (after setting receiveStart and
|
||||
* receiveStartTLI), and also to tell it to send apply feedback to the
|
||||
* primary whenever specially marked commit records are applied. This is
|
||||
* normally mapped to procLatch when walreceiver is running.
|
||||
*/
|
||||
Latch *latch;
|
||||
|
||||
slock_t mutex; /* locks shared variables shown above */
|
||||
|
||||
/*
|
||||
* Like flushedUpto, but advanced after writing and before flushing,
|
||||
* without the need to acquire the spin lock. Data can be read by another
|
||||
* process up to this point, but shouldn't be used for data integrity
|
||||
* purposes.
|
||||
*/
|
||||
pg_atomic_uint64 writtenUpto;
|
||||
|
||||
/*
|
||||
* force walreceiver reply? This doesn't need to be locked; memory
|
||||
* barriers for ordering are sufficient. But we do need atomic fetch and
|
||||
* store semantics, so use sig_atomic_t.
|
||||
*/
|
||||
sig_atomic_t force_reply; /* used as a bool */
|
||||
} WalRcvData;
|
||||
|
||||
extern WalRcvData *WalRcv;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
bool logical; /* True if this is logical replication stream,
|
||||
* false if physical stream. */
|
||||
char *slotname; /* Name of the replication slot or NULL. */
|
||||
XLogRecPtr startpoint; /* LSN of starting point. */
|
||||
|
||||
union
|
||||
{
|
||||
struct
|
||||
{
|
||||
TimeLineID startpointTLI; /* Starting timeline */
|
||||
} physical;
|
||||
struct
|
||||
{
|
||||
uint32 proto_version; /* Logical protocol version */
|
||||
List *publication_names; /* String list of publications */
|
||||
bool binary; /* Ask publisher to use binary */
|
||||
bool streaming; /* Streaming of large transactions */
|
||||
} logical;
|
||||
} proto;
|
||||
} WalRcvStreamOptions;
|
||||
|
||||
struct WalReceiverConn;
|
||||
typedef struct WalReceiverConn WalReceiverConn;
|
||||
|
||||
/*
|
||||
* Status of walreceiver query execution.
|
||||
*
|
||||
* We only define statuses that are currently used.
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
WALRCV_ERROR, /* There was error when executing the query. */
|
||||
WALRCV_OK_COMMAND, /* Query executed utility or replication
|
||||
* command. */
|
||||
WALRCV_OK_TUPLES, /* Query returned tuples. */
|
||||
WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
|
||||
WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
|
||||
WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication
|
||||
* protocol. */
|
||||
} WalRcvExecStatus;
|
||||
|
||||
/*
|
||||
* Return value for walrcv_exec, returns the status of the execution and
|
||||
* tuples if any.
|
||||
*/
|
||||
typedef struct WalRcvExecResult
|
||||
{
|
||||
WalRcvExecStatus status;
|
||||
int sqlstate;
|
||||
char *err;
|
||||
Tuplestorestate *tuplestore;
|
||||
TupleDesc tupledesc;
|
||||
} WalRcvExecResult;
|
||||
|
||||
/* WAL receiver - libpqwalreceiver hooks */
|
||||
|
||||
/*
|
||||
* walrcv_connect_fn
|
||||
*
|
||||
* Establish connection to a cluster. 'logical' is true if the
|
||||
* connection is logical, and false if the connection is physical.
|
||||
* 'appname' is a name associated to the connection, to use for example
|
||||
* with fallback_application_name or application_name. Returns the
|
||||
* details about the connection established, as defined by
|
||||
* WalReceiverConn for each WAL receiver module. On error, NULL is
|
||||
* returned with 'err' including the error generated.
|
||||
*/
|
||||
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
|
||||
bool logical,
|
||||
const char *appname,
|
||||
char **err);
|
||||
|
||||
/*
|
||||
* walrcv_check_conninfo_fn
|
||||
*
|
||||
* Parse and validate the connection string given as of 'conninfo'.
|
||||
*/
|
||||
typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
|
||||
|
||||
/*
|
||||
* walrcv_get_conninfo_fn
|
||||
*
|
||||
* Returns a user-displayable conninfo string. Note that any
|
||||
* security-sensitive fields should be obfuscated.
|
||||
*/
|
||||
typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
|
||||
|
||||
/*
|
||||
* walrcv_get_senderinfo_fn
|
||||
*
|
||||
* Provide information of the WAL sender this WAL receiver is connected
|
||||
* to, as of 'sender_host' for the host of the sender and 'sender_port'
|
||||
* for its port.
|
||||
*/
|
||||
typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
|
||||
char **sender_host,
|
||||
int *sender_port);
|
||||
|
||||
/*
|
||||
* walrcv_identify_system_fn
|
||||
*
|
||||
* Run IDENTIFY_SYSTEM on the cluster connected to and validate the
|
||||
* identity of the cluster. Returns the system ID of the cluster
|
||||
* connected to. 'primary_tli' is the timeline ID of the sender.
|
||||
*/
|
||||
typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
|
||||
TimeLineID *primary_tli);
|
||||
|
||||
/*
|
||||
* walrcv_server_version_fn
|
||||
*
|
||||
* Returns the version number of the cluster connected to.
|
||||
*/
|
||||
typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
|
||||
|
||||
/*
|
||||
* walrcv_readtimelinehistoryfile_fn
|
||||
*
|
||||
* Fetch from cluster the timeline history file for timeline 'tli'.
|
||||
* Returns the name of the timeline history file as of 'filename', its
|
||||
* contents as of 'content' and its 'size'.
|
||||
*/
|
||||
typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
|
||||
TimeLineID tli,
|
||||
char **filename,
|
||||
char **content,
|
||||
int *size);
|
||||
|
||||
/*
|
||||
* walrcv_startstreaming_fn
|
||||
*
|
||||
* Start streaming WAL data from given streaming options. Returns true
|
||||
* if the connection has switched successfully to copy-both mode and false
|
||||
* if the server received the command and executed it successfully, but
|
||||
* didn't switch to copy-mode.
|
||||
*/
|
||||
typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
|
||||
const WalRcvStreamOptions *options);
|
||||
|
||||
/*
|
||||
* walrcv_endstreaming_fn
|
||||
*
|
||||
* Stop streaming of WAL data. Returns the next timeline ID of the cluster
|
||||
* connected to in 'next_tli', or 0 if there was no report.
|
||||
*/
|
||||
typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
|
||||
TimeLineID *next_tli);
|
||||
|
||||
/*
|
||||
* walrcv_receive_fn
|
||||
*
|
||||
* Receive a message available from the WAL stream. 'buffer' is a pointer
|
||||
* to a buffer holding the message received. Returns the length of the data,
|
||||
* 0 if no data is available yet ('wait_fd' is a socket descriptor which can
|
||||
* be waited on before a retry), and -1 if the cluster ended the COPY.
|
||||
*/
|
||||
typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
|
||||
char **buffer,
|
||||
pgsocket *wait_fd);
|
||||
|
||||
/*
|
||||
* walrcv_send_fn
|
||||
*
|
||||
* Send a message of size 'nbytes' to the WAL stream with 'buffer' as
|
||||
* contents.
|
||||
*/
|
||||
typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
|
||||
const char *buffer,
|
||||
int nbytes);
|
||||
|
||||
/*
|
||||
* walrcv_create_slot_fn
|
||||
*
|
||||
* Create a new replication slot named 'slotname'. 'temporary' defines
|
||||
* if the slot is temporary. 'snapshot_action' defines the behavior wanted
|
||||
* for an exported snapshot (see replication protocol for more details).
|
||||
* 'lsn' includes the LSN position at which the created slot became
|
||||
* consistent. Returns the name of the exported snapshot for a logical
|
||||
* slot, or NULL for a physical slot.
|
||||
*/
|
||||
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
|
||||
const char *slotname,
|
||||
bool temporary,
|
||||
CRSSnapshotAction snapshot_action,
|
||||
XLogRecPtr *lsn);
|
||||
|
||||
/*
|
||||
* walrcv_get_backend_pid_fn
|
||||
*
|
||||
* Returns the PID of the remote backend process.
|
||||
*/
|
||||
typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
|
||||
|
||||
/*
|
||||
* walrcv_exec_fn
|
||||
*
|
||||
* Send generic queries (and commands) to the remote cluster. 'nRetTypes'
|
||||
* is the expected number of returned attributes, and 'retTypes' an array
|
||||
* including their type OIDs. Returns the status of the execution and
|
||||
* tuples if any.
|
||||
*/
|
||||
typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
|
||||
const char *query,
|
||||
const int nRetTypes,
|
||||
const Oid *retTypes);
|
||||
|
||||
/*
|
||||
* walrcv_disconnect_fn
|
||||
*
|
||||
* Disconnect with the cluster.
|
||||
*/
|
||||
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
|
||||
|
||||
typedef struct WalReceiverFunctionsType
|
||||
{
|
||||
walrcv_connect_fn walrcv_connect;
|
||||
walrcv_check_conninfo_fn walrcv_check_conninfo;
|
||||
walrcv_get_conninfo_fn walrcv_get_conninfo;
|
||||
walrcv_get_senderinfo_fn walrcv_get_senderinfo;
|
||||
walrcv_identify_system_fn walrcv_identify_system;
|
||||
walrcv_server_version_fn walrcv_server_version;
|
||||
walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
|
||||
walrcv_startstreaming_fn walrcv_startstreaming;
|
||||
walrcv_endstreaming_fn walrcv_endstreaming;
|
||||
walrcv_receive_fn walrcv_receive;
|
||||
walrcv_send_fn walrcv_send;
|
||||
walrcv_create_slot_fn walrcv_create_slot;
|
||||
walrcv_get_backend_pid_fn walrcv_get_backend_pid;
|
||||
walrcv_exec_fn walrcv_exec;
|
||||
walrcv_disconnect_fn walrcv_disconnect;
|
||||
} WalReceiverFunctionsType;
|
||||
|
||||
extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
|
||||
|
||||
#define walrcv_connect(conninfo, logical, appname, err) \
|
||||
WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
|
||||
#define walrcv_check_conninfo(conninfo) \
|
||||
WalReceiverFunctions->walrcv_check_conninfo(conninfo)
|
||||
#define walrcv_get_conninfo(conn) \
|
||||
WalReceiverFunctions->walrcv_get_conninfo(conn)
|
||||
#define walrcv_get_senderinfo(conn, sender_host, sender_port) \
|
||||
WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
|
||||
#define walrcv_identify_system(conn, primary_tli) \
|
||||
WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
|
||||
#define walrcv_server_version(conn) \
|
||||
WalReceiverFunctions->walrcv_server_version(conn)
|
||||
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
|
||||
WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
|
||||
#define walrcv_startstreaming(conn, options) \
|
||||
WalReceiverFunctions->walrcv_startstreaming(conn, options)
|
||||
#define walrcv_endstreaming(conn, next_tli) \
|
||||
WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
|
||||
#define walrcv_receive(conn, buffer, wait_fd) \
|
||||
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
|
||||
#define walrcv_send(conn, buffer, nbytes) \
|
||||
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
|
||||
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \
|
||||
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
|
||||
#define walrcv_get_backend_pid(conn) \
|
||||
WalReceiverFunctions->walrcv_get_backend_pid(conn)
|
||||
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
|
||||
WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
|
||||
#define walrcv_disconnect(conn) \
|
||||
WalReceiverFunctions->walrcv_disconnect(conn)
|
||||
|
||||
static inline void
|
||||
walrcv_clear_result(WalRcvExecResult *walres)
|
||||
{
|
||||
if (!walres)
|
||||
return;
|
||||
|
||||
if (walres->err)
|
||||
pfree(walres->err);
|
||||
|
||||
if (walres->tuplestore)
|
||||
tuplestore_end(walres->tuplestore);
|
||||
|
||||
if (walres->tupledesc)
|
||||
FreeTupleDesc(walres->tupledesc);
|
||||
|
||||
pfree(walres);
|
||||
}
|
||||
|
||||
/* prototypes for functions in walreceiver.c */
|
||||
extern void WalReceiverMain(void) pg_attribute_noreturn();
|
||||
extern void ProcessWalRcvInterrupts(void);
|
||||
|
||||
/* prototypes for functions in walreceiverfuncs.c */
|
||||
extern Size WalRcvShmemSize(void);
|
||||
extern void WalRcvShmemInit(void);
|
||||
extern void ShutdownWalRcv(void);
|
||||
extern bool WalRcvStreaming(void);
|
||||
extern bool WalRcvRunning(void);
|
||||
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
|
||||
const char *conninfo, const char *slotname,
|
||||
bool create_temp_slot);
|
||||
extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
|
||||
extern XLogRecPtr GetWalRcvWriteRecPtr(void);
|
||||
extern int GetReplicationApplyDelay(void);
|
||||
extern int GetReplicationTransferLatency(void);
|
||||
extern void WalRcvForceReply(void);
|
||||
|
||||
#endif /* _WALRECEIVER_H */
|
||||
74
db_include/replication/walsender.h
Executable file
74
db_include/replication/walsender.h
Executable file
@@ -0,0 +1,74 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* walsender.h
|
||||
* Exports from replication/walsender.c.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/walsender.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef _WALSENDER_H
|
||||
#define _WALSENDER_H
|
||||
|
||||
#include <signal.h>
|
||||
|
||||
/*
|
||||
* What to do with a snapshot in create replication slot command.
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
CRS_EXPORT_SNAPSHOT,
|
||||
CRS_NOEXPORT_SNAPSHOT,
|
||||
CRS_USE_SNAPSHOT
|
||||
} CRSSnapshotAction;
|
||||
|
||||
/* global state */
|
||||
extern bool am_walsender;
|
||||
extern bool am_cascading_walsender;
|
||||
extern bool am_db_walsender;
|
||||
extern bool wake_wal_senders;
|
||||
|
||||
/* user-settable parameters */
|
||||
extern int max_wal_senders;
|
||||
extern int wal_sender_timeout;
|
||||
extern bool log_replication_commands;
|
||||
|
||||
extern void InitWalSender(void);
|
||||
extern bool exec_replication_command(const char *query_string);
|
||||
extern void WalSndErrorCleanup(void);
|
||||
extern void WalSndResourceCleanup(bool isCommit);
|
||||
extern void WalSndSignals(void);
|
||||
extern Size WalSndShmemSize(void);
|
||||
extern void WalSndShmemInit(void);
|
||||
extern void WalSndWakeup(void);
|
||||
extern void WalSndInitStopping(void);
|
||||
extern void WalSndWaitStopping(void);
|
||||
extern void HandleWalSndInitStopping(void);
|
||||
extern void WalSndRqstFileReload(void);
|
||||
|
||||
/*
|
||||
* Remember that we want to wakeup walsenders later
|
||||
*
|
||||
* This is separated from doing the actual wakeup because the writeout is done
|
||||
* while holding contended locks.
|
||||
*/
|
||||
#define WalSndWakeupRequest() \
|
||||
do { wake_wal_senders = true; } while (0)
|
||||
|
||||
/*
|
||||
* wakeup walsenders if there is work to be done
|
||||
*/
|
||||
#define WalSndWakeupProcessRequests() \
|
||||
do \
|
||||
{ \
|
||||
if (wake_wal_senders) \
|
||||
{ \
|
||||
wake_wal_senders = false; \
|
||||
if (max_wal_senders > 0) \
|
||||
WalSndWakeup(); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#endif /* _WALSENDER_H */
|
||||
128
db_include/replication/walsender_private.h
Executable file
128
db_include/replication/walsender_private.h
Executable file
@@ -0,0 +1,128 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* walsender_private.h
|
||||
* Private definitions from replication/walsender.c.
|
||||
*
|
||||
* Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/walsender_private.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef _WALSENDER_PRIVATE_H
|
||||
#define _WALSENDER_PRIVATE_H
|
||||
|
||||
#include "access/xlog.h"
|
||||
#include "nodes/nodes.h"
|
||||
#include "replication/syncrep.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/spin.h"
|
||||
|
||||
typedef enum WalSndState
|
||||
{
|
||||
WALSNDSTATE_STARTUP = 0,
|
||||
WALSNDSTATE_BACKUP,
|
||||
WALSNDSTATE_CATCHUP,
|
||||
WALSNDSTATE_STREAMING,
|
||||
WALSNDSTATE_STOPPING
|
||||
} WalSndState;
|
||||
|
||||
/*
|
||||
* Each walsender has a WalSnd struct in shared memory.
|
||||
*
|
||||
* This struct is protected by its 'mutex' spinlock field, except that some
|
||||
* members are only written by the walsender process itself, and thus that
|
||||
* process is free to read those members without holding spinlock. pid and
|
||||
* needreload always require the spinlock to be held for all accesses.
|
||||
*/
|
||||
typedef struct WalSnd
|
||||
{
|
||||
pid_t pid; /* this walsender's PID, or 0 if not active */
|
||||
|
||||
WalSndState state; /* this walsender's state */
|
||||
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
|
||||
bool needreload; /* does currently-open file need to be
|
||||
* reloaded? */
|
||||
|
||||
/*
|
||||
* The xlog locations that have been written, flushed, and applied by
|
||||
* standby-side. These may be invalid if the standby-side has not offered
|
||||
* values yet.
|
||||
*/
|
||||
XLogRecPtr write;
|
||||
XLogRecPtr flush;
|
||||
XLogRecPtr apply;
|
||||
|
||||
/* Measured lag times, or -1 for unknown/none. */
|
||||
TimeOffset writeLag;
|
||||
TimeOffset flushLag;
|
||||
TimeOffset applyLag;
|
||||
|
||||
/*
|
||||
* The priority order of the standby managed by this WALSender, as listed
|
||||
* in synchronous_standby_names, or 0 if not-listed.
|
||||
*/
|
||||
int sync_standby_priority;
|
||||
|
||||
/* Protects shared variables shown above. */
|
||||
slock_t mutex;
|
||||
|
||||
/*
|
||||
* Pointer to the walsender's latch. Used by backends to wake up this
|
||||
* walsender when it has work to do. NULL if the walsender isn't active.
|
||||
*/
|
||||
Latch *latch;
|
||||
|
||||
/*
|
||||
* Timestamp of the last message received from standby.
|
||||
*/
|
||||
TimestampTz replyTime;
|
||||
} WalSnd;
|
||||
|
||||
extern WalSnd *MyWalSnd;
|
||||
|
||||
/* There is one WalSndCtl struct for the whole database cluster */
|
||||
typedef struct
|
||||
{
|
||||
/*
|
||||
* Synchronous replication queue with one queue per request type.
|
||||
* Protected by SyncRepLock.
|
||||
*/
|
||||
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
|
||||
|
||||
/*
|
||||
* Current location of the head of the queue. All waiters should have a
|
||||
* waitLSN that follows this value. Protected by SyncRepLock.
|
||||
*/
|
||||
XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];
|
||||
|
||||
/*
|
||||
* Are any sync standbys defined? Waiting backends can't reload the
|
||||
* config file safely, so checkpointer updates this value as needed.
|
||||
* Protected by SyncRepLock.
|
||||
*/
|
||||
bool sync_standbys_defined;
|
||||
|
||||
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER];
|
||||
} WalSndCtlData;
|
||||
|
||||
extern WalSndCtlData *WalSndCtl;
|
||||
|
||||
|
||||
extern void WalSndSetState(WalSndState state);
|
||||
|
||||
/*
|
||||
* Internal functions for parsing the replication grammar, in repl_gram.y and
|
||||
* repl_scanner.l
|
||||
*/
|
||||
extern int replication_yyparse(void);
|
||||
extern int replication_yylex(void);
|
||||
extern void replication_yyerror(const char *str) pg_attribute_noreturn();
|
||||
extern void replication_scanner_init(const char *query_string);
|
||||
extern void replication_scanner_finish(void);
|
||||
extern bool replication_scanner_is_replication_command(void);
|
||||
|
||||
extern Node *replication_parse_result;
|
||||
|
||||
#endif /* _WALSENDER_PRIVATE_H */
|
||||
99
db_include/replication/worker_internal.h
Executable file
99
db_include/replication/worker_internal.h
Executable file
@@ -0,0 +1,99 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* worker_internal.h
|
||||
* Internal headers shared by logical replication workers.
|
||||
*
|
||||
* Portions Copyright (c) 2016-2021, PostgreSQL Global Development Group
|
||||
*
|
||||
* src/include/replication/worker_internal.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef WORKER_INTERNAL_H
|
||||
#define WORKER_INTERNAL_H
|
||||
|
||||
#include <signal.h>
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
#include "catalog/pg_subscription.h"
|
||||
#include "datatype/timestamp.h"
|
||||
#include "storage/lock.h"
|
||||
#include "storage/spin.h"
|
||||
|
||||
|
||||
typedef struct LogicalRepWorker
|
||||
{
|
||||
/* Time at which this worker was launched. */
|
||||
TimestampTz launch_time;
|
||||
|
||||
/* Indicates if this slot is used or free. */
|
||||
bool in_use;
|
||||
|
||||
/* Increased every time the slot is taken by new worker. */
|
||||
uint16 generation;
|
||||
|
||||
/* Pointer to proc array. NULL if not running. */
|
||||
PGPROC *proc;
|
||||
|
||||
/* Database id to connect to. */
|
||||
Oid dbid;
|
||||
|
||||
/* User to use for connection (will be same as owner of subscription). */
|
||||
Oid userid;
|
||||
|
||||
/* Subscription id for the worker. */
|
||||
Oid subid;
|
||||
|
||||
/* Used for initial table synchronization. */
|
||||
Oid relid;
|
||||
char relstate;
|
||||
XLogRecPtr relstate_lsn;
|
||||
slock_t relmutex;
|
||||
|
||||
/* Stats. */
|
||||
XLogRecPtr last_lsn;
|
||||
TimestampTz last_send_time;
|
||||
TimestampTz last_recv_time;
|
||||
XLogRecPtr reply_lsn;
|
||||
TimestampTz reply_time;
|
||||
} LogicalRepWorker;
|
||||
|
||||
/* Main memory context for apply worker. Permanent during worker lifetime. */
|
||||
extern MemoryContext ApplyContext;
|
||||
|
||||
/* libpqreceiver connection */
|
||||
extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
|
||||
|
||||
/* Worker and subscription objects. */
|
||||
extern Subscription *MySubscription;
|
||||
extern LogicalRepWorker *MyLogicalRepWorker;
|
||||
|
||||
extern bool in_remote_transaction;
|
||||
|
||||
extern void logicalrep_worker_attach(int slot);
|
||||
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
|
||||
bool only_running);
|
||||
extern List *logicalrep_workers_find(Oid subid, bool only_running);
|
||||
extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
|
||||
Oid userid, Oid relid);
|
||||
extern void logicalrep_worker_stop(Oid subid, Oid relid);
|
||||
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
|
||||
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
|
||||
|
||||
extern int logicalrep_sync_worker_count(Oid subid);
|
||||
|
||||
extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
|
||||
char *originname, int szorgname);
|
||||
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
|
||||
|
||||
void process_syncing_tables(XLogRecPtr current_lsn);
|
||||
void invalidate_syncing_table_states(Datum arg, int cacheid,
|
||||
uint32 hashvalue);
|
||||
|
||||
static inline bool
|
||||
am_tablesync_worker(void)
|
||||
{
|
||||
return OidIsValid(MyLogicalRepWorker->relid);
|
||||
}
|
||||
|
||||
#endif /* WORKER_INTERNAL_H */
|
||||
Reference in New Issue
Block a user