PostgreSQL Recipes: Asynchronous Notifications in… Replica !?

To prepare asynchronous listen / notify notifications on a replica, we need postgres . As the documentation says :





Transactions started in hot standby never receive a transaction ID and cannot be written to the write-ahead log. Therefore, when trying to perform the following actions, errors will occur:





LISTEN



, NOTIFY







Therefore, we take the async.c file from the source, rename all public methods (not static functions) in it, remove the connection with transactions and add the processing of the SIGUSR1 signal to get it like this:





src / backend / commands / async.c
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 5739d2b40f..9f62d4ca6b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -1,3 +1,5 @@
+#include <include.h>
+
 /*-------------------------------------------------------------------------
  *
  * async.c
@@ -46,7 +48,7 @@
  *	  to. In case there is a match it delivers the notification event to its
  *	  frontend.  Non-matching events are simply skipped.
  *
- * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
+ * 4. The NOTIFY statement (routine Async_Notify_My) stores the notification in
  *	  a backend-local list which will not be processed until transaction end.
  *
  *	  Duplicate notifications from the same transaction are sent out as one
@@ -56,7 +58,7 @@
  *	  that has been sent, it can easily add some unique string into the extra
  *	  payload parameter.
  *
- *	  When the transaction is ready to commit, PreCommit_Notify() adds the
+ *	  When the transaction is ready to commit, PreCommit_Notify_My() adds the
  *	  pending notifications to the head of the queue. The head pointer of the
  *	  queue always points to the next free position and a position is just a
  *	  page number and the offset in that page. This is done before marking the
@@ -67,7 +69,7 @@
  *	  Once we have put all of the notifications into the queue, we return to
  *	  CommitTransaction() which will then do the actual transaction commit.
  *
- *	  After commit we are called another time (AtCommit_Notify()). Here we
+ *	  After commit we are called another time (AtCommit_Notify_My()). Here we
  *	  make the actual updates to the effective listen state (listenChannels).
  *
  *	  Finally, after we are out of the transaction altogether, we check if
@@ -171,7 +173,7 @@ typedef struct AsyncQueueEntry
 {
 	int			length;			/* total allocated length of entry */
 	Oid			dboid;			/* sender's database OID */
-	TransactionId xid;			/* sender's XID */
+//	TransactionId xid;			/* sender's XID */
 	int32		srcPid;			/* sender's PID */
 	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
 } AsyncQueueEntry;
@@ -414,14 +416,16 @@ typedef struct NotificationHash
 
 static NotificationList *pendingNotifies = NULL;
 
+static pqsigfunc pg_async_signal_original = NULL;
+
 /*
- * Inbound notifications are initially processed by HandleNotifyInterrupt(),
+ * Inbound notifications are initially processed by HandleNotifyInterruptMy(),
  * called from inside a signal handler. That just sets the
  * notifyInterruptPending flag and sets the process
- * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
+ * latch. ProcessNotifyInterruptMy() will then be called whenever it's safe to
  * actually deal with the interrupt.
  */
-volatile sig_atomic_t notifyInterruptPending = false;
+//volatile sig_atomic_t notifyInterruptPending = false;
 
 /* True if we've registered an on_shmem_exit cleanup */
 static bool unlistenExitRegistered = false;
@@ -436,7 +440,7 @@ static bool backendHasSentNotifications = false;
 static bool backendTryAdvanceTail = false;
 
 /* GUC parameter */
-bool		Trace_notify = false;
+//bool		Trace_notify = false;
 
 /* local function prototypes */
 static int	asyncQueuePageDiff(int p, int q);
@@ -469,6 +473,12 @@ static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
 
+static void pg_async_signal(SIGNAL_ARGS) {
+    HandleNotifyInterruptMy();
+    if (notifyInterruptPending) ProcessNotifyInterruptMy();
+    pg_async_signal_original(postgres_signal_arg);
+}
+
 /*
  * Compute the difference between two queue page numbers (i.e., p - q),
  * accounting for wraparound.
@@ -509,11 +519,11 @@ asyncQueuePagePrecedes(int p, int q)
  * Report space needed for our shared memory area
  */
 Size
-AsyncShmemSize(void)
+AsyncShmemSizeMy(void)
 {
 	Size		size;
 
-	/* This had better match AsyncShmemInit */
+	/* This had better match AsyncShmemInitMy */
 	size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
 	size = add_size(size, offsetof(AsyncQueueControl, backend));
 
@@ -526,7 +536,7 @@ AsyncShmemSize(void)
  * Initialize our shared memory area
  */
 void
-AsyncShmemInit(void)
+AsyncShmemInitMy(void)
 {
 	bool		found;
 	Size		size;
@@ -585,7 +595,7 @@ AsyncShmemInit(void)
  *	  SQL function to send a notification event
  */
 Datum
-pg_notify(PG_FUNCTION_ARGS)
+pg_notify_my(PG_FUNCTION_ARGS)
 {
 	const char *channel;
 	const char *payload;
@@ -601,16 +611,16 @@ pg_notify(PG_FUNCTION_ARGS)
 		payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
 
 	/* For NOTIFY as a statement, this is checked in ProcessUtility */
-	PreventCommandDuringRecovery("NOTIFY");
+//	PreventCommandDuringRecovery("NOTIFY");
 
-	Async_Notify(channel, payload);
+	Async_Notify_My(channel, payload);
 
 	PG_RETURN_VOID();
 }
 
 
 /*
- * Async_Notify
+ * Async_Notify_My
  *
  *		This is executed by the SQL notify command.
  *
@@ -619,7 +629,7 @@ pg_notify(PG_FUNCTION_ARGS)
  *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  */
 void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify_My(const char *channel, const char *payload)
 {
 	int			my_level = GetCurrentTransactionNestLevel();
 	size_t		channel_len;
@@ -631,7 +641,7 @@ Async_Notify(const char *channel, const char *payload)
 		elog(ERROR, "cannot send notifications from a parallel worker");
 
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Notify(%s)", channel);
+		elog(DEBUG1, "Async_Notify_My(%s)", channel);
 
 	channel_len = channel ? strlen(channel) : 0;
 	payload_len = payload ? strlen(payload) : 0;
@@ -679,7 +689,7 @@ Async_Notify(const char *channel, const char *payload)
 		/*
 		 * First notify event in current (sub)xact. Note that we allocate the
 		 * NotificationList in TopTransactionContext; the nestingLevel might
-		 * get changed later by AtSubCommit_Notify.
+		 * get changed later by AtSubCommit_Notify_My.
 		 */
 		notifies = (NotificationList *)
 			MemoryContextAlloc(TopTransactionContext,
@@ -725,7 +735,7 @@ queue_listen(ListenActionKind action, const char *channel)
 	int			my_level = GetCurrentTransactionNestLevel();
 
 	/*
-	 * Unlike Async_Notify, we don't try to collapse out duplicates. It would
+	 * Unlike Async_Notify_My, we don't try to collapse out duplicates. It would
 	 * be too complicated to ensure we get the right interactions of
 	 * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
 	 * would be any performance benefit anyway in sane applications.
@@ -745,7 +755,7 @@ queue_listen(ListenActionKind action, const char *channel)
 		/*
 		 * First action in current sub(xact). Note that we allocate the
 		 * ActionList in TopTransactionContext; the nestingLevel might get
-		 * changed later by AtSubCommit_Notify.
+		 * changed later by AtSubCommit_Notify_My.
 		 */
 		actions = (ActionList *)
 			MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
@@ -761,29 +771,29 @@ queue_listen(ListenActionKind action, const char *channel)
 }
 
 /*
- * Async_Listen
+ * Async_Listen_My
  *
  *		This is executed by the SQL listen command.
  */
 void
-Async_Listen(const char *channel)
+Async_Listen_My(const char *channel)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
+		elog(DEBUG1, "Async_Listen_My(%s,%d)", channel, MyProcPid);
 
 	queue_listen(LISTEN_LISTEN, channel);
 }
 
 /*
- * Async_Unlisten
+ * Async_Unlisten_My
  *
  *		This is executed by the SQL unlisten command.
  */
 void
-Async_Unlisten(const char *channel)
+Async_Unlisten_My(const char *channel)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
+		elog(DEBUG1, "Async_Unlisten_My(%s,%d)", channel, MyProcPid);
 
 	/* If we couldn't possibly be listening, no need to queue anything */
 	if (pendingActions == NULL && !unlistenExitRegistered)
@@ -793,15 +803,15 @@ Async_Unlisten(const char *channel)
 }
 
 /*
- * Async_UnlistenAll
+ * Async_UnlistenAll_My
  *
  *		This is invoked by UNLISTEN * command, and also at backend exit.
  */
 void
-Async_UnlistenAll(void)
+Async_UnlistenAll_My(void)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
+		elog(DEBUG1, "Async_UnlistenAll_My(%d)", MyProcPid);
 
 	/* If we couldn't possibly be listening, no need to queue anything */
 	if (pendingActions == NULL && !unlistenExitRegistered)
@@ -818,7 +828,7 @@ Async_UnlistenAll(void)
  * change within a transaction.
  */
 Datum
-pg_listening_channels(PG_FUNCTION_ARGS)
+pg_listening_channels_my(PG_FUNCTION_ARGS)
 {
 	FuncCallContext *funcctx;
 
@@ -858,13 +868,13 @@ Async_UnlistenOnExit(int code, Datum arg)
 }
 
 /*
- * AtPrepare_Notify
+ * AtPrepare_Notify_My
  *
  *		This is called at the prepare phase of a two-phase
  *		transaction.  Save the state for possible commit later.
  */
 void
-AtPrepare_Notify(void)
+AtPrepare_Notify_My(void)
 {
 	/* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
 	if (pendingActions || pendingNotifies)
@@ -874,7 +884,7 @@ AtPrepare_Notify(void)
 }
 
 /*
- * PreCommit_Notify
+ * PreCommit_Notify_My
  *
  *		This is called at transaction commit, before actually committing to
  *		clog.
@@ -889,7 +899,7 @@ AtPrepare_Notify(void)
  *		we can still throw error if we run out of queue space.
  */
 void
-PreCommit_Notify(void)
+PreCommit_Notify_My(void)
 {
 	ListCell   *p;
 
@@ -897,7 +907,7 @@ PreCommit_Notify(void)
 		return;					/* no relevant statements in this xact */
 
 	if (Trace_notify)
-		elog(DEBUG1, "PreCommit_Notify");
+		elog(DEBUG1, "PreCommit_Notify_My");
 
 	/* Preflight for any pending listen/unlisten actions */
 	if (pendingActions != NULL)
@@ -932,7 +942,7 @@ PreCommit_Notify(void)
 		 * so cheap if we don't, and we'd prefer not to do that work while
 		 * holding NotifyQueueLock.
 		 */
-		(void) GetCurrentTransactionId();
+//		(void) GetCurrentTransactionId();
 
 		/*
 		 * Serialize writers by acquiring a special lock that we hold till
@@ -951,7 +961,7 @@ PreCommit_Notify(void)
 		 * used by the flatfiles mechanism.)
 		 */
 		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-						 AccessExclusiveLock);
+						 RowExclusiveLock);
 
 		/* Now push the notifications into the queue */
 		backendHasSentNotifications = true;
@@ -984,14 +994,14 @@ PreCommit_Notify(void)
 }
 
 /*
- * AtCommit_Notify
+ * AtCommit_Notify_My
  *
  *		This is called at transaction commit, after committing to clog.
  *
  *		Update listenChannels and clear transaction-local state.
  */
 void
-AtCommit_Notify(void)
+AtCommit_Notify_My(void)
 {
 	ListCell   *p;
 
@@ -1003,7 +1013,7 @@ AtCommit_Notify(void)
 		return;
 
 	if (Trace_notify)
-		elog(DEBUG1, "AtCommit_Notify");
+		elog(DEBUG1, "AtCommit_Notify_My");
 
 	/* Perform any pending listen/unlisten actions */
 	if (pendingActions != NULL)
@@ -1036,7 +1046,7 @@ AtCommit_Notify(void)
 }
 
 /*
- * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
+ * Exec_ListenPreCommit --- subroutine for PreCommit_Notify_My
  *
  * This function must make sure we are ready to catch any incoming messages.
  */
@@ -1131,7 +1141,7 @@ Exec_ListenPreCommit(void)
 }
 
 /*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
+ * Exec_ListenCommit --- subroutine for AtCommit_Notify_My
  *
  * Add the channel to the list of channels we are listening on.
  */
@@ -1155,10 +1165,12 @@ Exec_ListenCommit(const char *channel)
 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
 	listenChannels = lappend(listenChannels, pstrdup(channel));
 	MemoryContextSwitchTo(oldcontext);
+
+	if (!pg_async_signal_original) pg_async_signal_original = pqsignal(SIGUSR1, pg_async_signal);
 }
 
 /*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenCommit --- subroutine for AtCommit_Notify_My
  *
  * Remove the specified channel name from listenChannels.
  */
@@ -1186,10 +1198,15 @@ Exec_UnlistenCommit(const char *channel)
 	 * We do not complain about unlistening something not being listened;
 	 * should we?
 	 */
+
+	if (!list_length(listenChannels) && pg_async_signal_original) {
+		pqsignal(SIGUSR1, pg_async_signal_original);
+		pg_async_signal_original = NULL;
+	}
 }
 
 /*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify_My
  *
  *		Unlisten on all channels for this backend.
  */
@@ -1201,10 +1218,15 @@ Exec_UnlistenAllCommit(void)
 
 	list_free_deep(listenChannels);
 	listenChannels = NIL;
+
+	if (pg_async_signal_original) {
+		pqsignal(SIGUSR1, pg_async_signal_original);
+		pg_async_signal_original = NULL;
+	}
 }
 
 /*
- * ProcessCompletedNotifies --- send out signals and self-notifies
+ * ProcessCompletedNotifiesMy --- send out signals and self-notifies
  *
  * This is called from postgres.c just before going idle at the completion
  * of a transaction.  If we issued any notifications in the just-completed
@@ -1213,10 +1235,10 @@ Exec_UnlistenAllCommit(void)
  * Also, if we filled enough queue pages with new notifies, try to advance
  * the queue tail pointer.
  *
- * The reason that this is not done in AtCommit_Notify is that there is
+ * The reason that this is not done in AtCommit_Notify_My is that there is
  * a nonzero chance of errors here (for example, encoding conversion errors
  * while trying to format messages to our frontend).  An error during
- * AtCommit_Notify would be a PANIC condition.  The timing is also arranged
+ * AtCommit_Notify_My would be a PANIC condition.  The timing is also arranged
  * to ensure that a transaction's self-notifies are delivered to the frontend
  * before it gets the terminating ReadyForQuery message.
  *
@@ -1227,8 +1249,9 @@ Exec_UnlistenAllCommit(void)
  * NOTE: we are outside of any transaction here.
  */
 void
-ProcessCompletedNotifies(void)
+ProcessCompletedNotifiesMy(void)
 {
+	bool idle = !IsTransactionOrTransactionBlock();
 	MemoryContext caller_context;
 
 	/* Nothing to do if we didn't send any notifications */
@@ -1249,12 +1272,13 @@ ProcessCompletedNotifies(void)
 	caller_context = CurrentMemoryContext;
 
 	if (Trace_notify)
-		elog(DEBUG1, "ProcessCompletedNotifies");
+		elog(DEBUG1, "ProcessCompletedNotifiesMy");
 
 	/*
 	 * We must run asyncQueueReadAllNotifications inside a transaction, else
 	 * bad things happen if it gets an error.
 	 */
+	if (idle)
 	StartTransactionCommand();
 
 	/* Send signals to other backends */
@@ -1275,6 +1299,7 @@ ProcessCompletedNotifies(void)
 		asyncQueueAdvanceTail();
 	}
 
+	if (idle)
 	CommitTransactionCommand();
 
 	MemoryContextSwitchTo(caller_context);
@@ -1431,7 +1456,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
 	entryLength = QUEUEALIGN(entryLength);
 	qe->length = entryLength;
 	qe->dboid = MyDatabaseId;
-	qe->xid = GetCurrentTransactionId();
+//	qe->xid = GetCurrentTransactionId();
 	qe->srcPid = MyProcPid;
 	memcpy(qe->data, n->data, channellen + payloadlen + 2);
 }
@@ -1567,7 +1592,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
  * occupied.
  */
 Datum
-pg_notification_queue_usage(PG_FUNCTION_ARGS)
+pg_notification_queue_usage_my(PG_FUNCTION_ARGS)
 {
 	double		usage;
 
@@ -1749,7 +1774,7 @@ SignalBackends(void)
 }
 
 /*
- * AtAbort_Notify
+ * AtAbort_Notify_My
  *
  *	This is called at transaction abort.
  *
@@ -1757,10 +1782,10 @@ SignalBackends(void)
  *	executed if the transaction got committed.
  */
 void
-AtAbort_Notify(void)
+AtAbort_Notify_My(void)
 {
 	/*
-	 * If we LISTEN but then roll back the transaction after PreCommit_Notify,
+	 * If we LISTEN but then roll back the transaction after PreCommit_Notify_My,
 	 * we have registered as a listener but have not made any entry in
 	 * listenChannels.  In that case, deregister again.
 	 */
@@ -1772,12 +1797,12 @@ AtAbort_Notify(void)
 }
 
 /*
- * AtSubCommit_Notify() --- Take care of subtransaction commit.
+ * AtSubCommit_Notify_My() --- Take care of subtransaction commit.
  *
  * Reassign all items in the pending lists to the parent transaction.
  */
 void
-AtSubCommit_Notify(void)
+AtSubCommit_Notify_My(void)
 {
 	int			my_level = GetCurrentTransactionNestLevel();
 
@@ -1844,10 +1869,10 @@ AtSubCommit_Notify(void)
 }
 
 /*
- * AtSubAbort_Notify() --- Take care of subtransaction abort.
+ * AtSubAbort_Notify_My() --- Take care of subtransaction abort.
  */
 void
-AtSubAbort_Notify(void)
+AtSubAbort_Notify_My(void)
 {
 	int			my_level = GetCurrentTransactionNestLevel();
 
@@ -1882,15 +1907,15 @@ AtSubAbort_Notify(void)
 }
 
 /*
- * HandleNotifyInterrupt
+ * HandleNotifyInterruptMy
  *
  *		Signal handler portion of interrupt handling. Let the backend know
  *		that there's a pending notify interrupt. If we're currently reading
  *		from the client, this will interrupt the read and
- *		ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
+ *		ProcessClientReadInterrupt() will call ProcessNotifyInterruptMy().
  */
 void
-HandleNotifyInterrupt(void)
+HandleNotifyInterruptMy(void)
 {
 	/*
 	 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
@@ -1905,18 +1930,18 @@ HandleNotifyInterrupt(void)
 }
 
 /*
- * ProcessNotifyInterrupt
+ * ProcessNotifyInterruptMy
  *
  *		This is called if we see notifyInterruptPending set, just before
  *		transmitting ReadyForQuery at the end of a frontend command, and
  *		also if a notify signal occurs while reading from the frontend.
- *		HandleNotifyInterrupt() will cause the read to be interrupted
+ *		HandleNotifyInterruptMy() will cause the read to be interrupted
  *		via the process's latch, and this routine will get called.
  *		If we are truly idle (ie, *not* inside a transaction block),
  *		process the incoming notifies.
  */
 void
-ProcessNotifyInterrupt(void)
+ProcessNotifyInterruptMy(void)
 {
 	if (IsTransactionOrTransactionBlock())
 		return;					/* not really idle */
@@ -1999,7 +2024,7 @@ asyncQueueReadAllNotifications(void)
 	 * before we see them.
 	 *----------
 	 */
-	snapshot = RegisterSnapshot(GetLatestSnapshot());
+//	snapshot = RegisterSnapshot(GetLatestSnapshot());
 
 	/*
 	 * It is possible that we fail while trying to send a message to our
@@ -2078,7 +2103,7 @@ asyncQueueReadAllNotifications(void)
 	PG_END_TRY();
 
 	/* Done with snapshot */
-	UnregisterSnapshot(snapshot);
+//	UnregisterSnapshot(snapshot);
 }
 
 /*
@@ -2126,6 +2151,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		/* Ignore messages destined for other databases */
 		if (qe->dboid == MyDatabaseId)
 		{
+#if 0
 			if (XidInMVCCSnapshot(qe->xid, snapshot))
 			{
 				/*
@@ -2153,6 +2179,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 			}
 			else if (TransactionIdDidCommit(qe->xid))
 			{
+#endif
 				/* qe->data is the null-terminated channel name */
 				char	   *channel = qe->data;
 
@@ -2161,8 +2188,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 					/* payload follows channel name */
 					char	   *payload = qe->data + strlen(channel) + 1;
 
-					NotifyMyFrontEnd(channel, payload, qe->srcPid);
+					NotifyMyFrontEndMy(channel, payload, qe->srcPid);
 				}
+#if 0
 			}
 			else
 			{
@@ -2171,6 +2199,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 				 * ignore its notifications.
 				 */
 			}
+#endif
 		}
 
 		/* Loop back if we're not at end of page */
@@ -2271,6 +2300,7 @@ static void
 ProcessIncomingNotify(void)
 {
 	/* We *must* reset the flag */
+	bool idle = !IsTransactionOrTransactionBlock();
 	notifyInterruptPending = false;
 
 	/* Do nothing else if we aren't actively listening */
@@ -2286,10 +2316,12 @@ ProcessIncomingNotify(void)
 	 * We must run asyncQueueReadAllNotifications inside a transaction, else
 	 * bad things happen if it gets an error.
 	 */
+	if (idle)
 	StartTransactionCommand();
 
 	asyncQueueReadAllNotifications();
 
+	if (idle)
 	CommitTransactionCommand();
 
 	/*
@@ -2307,7 +2339,7 @@ ProcessIncomingNotify(void)
  * Send NOTIFY message to my front end.
  */
 void
-NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
+NotifyMyFrontEndMy(const char *channel, const char *payload, int32 srcPid)
 {
 	if (whereToSendOutput == DestRemote)
 	{

      
      



Now we make an extension for functions





pg_async - 1.0.sql
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pg_async" to load this file. \quit

CREATE FUNCTION pg_listen(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_listen' LANGUAGE C;
CREATE FUNCTION pg_listening_channels() RETURNS setof pg_catalog.text STRICT AS 'MODULE_PATHNAME', 'pg_async_listening_channels' LANGUAGE C;
CREATE FUNCTION pg_notification_queue_usage() RETURNS pg_catalog.float8 STRICT AS 'MODULE_PATHNAME', 'pg_async_notification_queue_usage' LANGUAGE C;
CREATE FUNCTION pg_notify(channel pg_catalog.text default null, payload pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_notify' LANGUAGE C;
CREATE FUNCTION pg_unlisten_all() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten_all' LANGUAGE C;
CREATE FUNCTION pg_unlisten(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten' LANGUAGE C;
      
      



Here to standard pg_listening_channels , pg_notification_queue_usage and pg_notify added new comfortable pg_listen function, pg_unlisten and pg_unlisten_all, supplementing the commands the LISTEN , UNLISTEN and UNLISTEN * .





We make the implementation of these functions by calling the original functions on the host, and on the replica of the functions from the modified copied async.c file :





pg_async.c
#define EXTENSION(function) Datum (function)(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(function); Datum (function)(PG_FUNCTION_ARGS)

EXTENSION(pg_async_listen) {
    const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));
    !XactReadOnly ? Async_Listen(channel) : Async_Listen_My(channel);
    PG_RETURN_VOID();
}

EXTENSION(pg_async_listening_channels) {
    return !XactReadOnly ? pg_listening_channels(fcinfo) : pg_listening_channels_my(fcinfo);
}

EXTENSION(pg_async_notification_queue_usage) {
    return !XactReadOnly ? pg_notification_queue_usage(fcinfo) : pg_notification_queue_usage_my(fcinfo);
}

EXTENSION(pg_async_notify) {
    return !XactReadOnly ? pg_notify(fcinfo) : pg_notify_my(fcinfo);
}

EXTENSION(pg_async_unlisten_all) {
    !XactReadOnly ? Async_UnlistenAll() : Async_UnlistenAll_My();
    PG_RETURN_VOID();
}

EXTENSION(pg_async_unlisten) {
    const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));
    !XactReadOnly ? Async_Unlisten(channel) : Async_Unlisten_My(channel);
    PG_RETURN_VOID();
}

      
      



Also, we register hooks for executing commands, for transactions and shared memory:





pg_async.c
static ProcessUtility_hook_type pg_async_ProcessUtility_hook_original = NULL;
static shmem_startup_hook_type pg_async_shmem_startup_hook_original = NULL;

void _PG_init(void); void _PG_init(void) {
    if (!process_shared_preload_libraries_in_progress) return;
    pg_async_ProcessUtility_hook_original = ProcessUtility_hook;
    ProcessUtility_hook = pg_async_ProcessUtility_hook;
    pg_async_shmem_startup_hook_original = shmem_startup_hook;
    shmem_startup_hook = pg_async_shmem_startup_hook;
    RequestAddinShmemSpace(AsyncShmemSizeMy());
    RegisterSubXactCallback(pg_async_SubXactCallback, NULL);
    RegisterXactCallback(pg_async_XactCallback, NULL);
}

void _PG_fini(void); void _PG_fini(void) {
    ProcessUtility_hook = pg_async_ProcessUtility_hook_original;
    shmem_startup_hook = pg_async_shmem_startup_hook_original;
    UnregisterSubXactCallback(pg_async_SubXactCallback, NULL);
    UnregisterXactCallback(pg_async_XactCallback, NULL);
}

      
      



In the shared memory hook, register it from the modified copied async.c file :





pg_async.c
static void pg_async_shmem_startup_hook(void) {
    if (pg_async_shmem_startup_hook_original) pg_async_shmem_startup_hook_original();
    LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
    AsyncShmemInitMy();
    LWLockRelease(AddinShmemInitLock);
}

      
      



In the hook on transactions on the replica, we call the corresponding functions from the modified copied async.c file :





pg_async.c
static void pg_async_XactCallback(XactEvent event, void *arg) {
    if (!XactReadOnly) return;
    switch (event) {
        case XACT_EVENT_ABORT: AtAbort_Notify_My(); break;
        case XACT_EVENT_COMMIT: AtCommit_Notify_My(); ProcessCompletedNotifiesMy(); break;
        case XACT_EVENT_PRE_COMMIT: PreCommit_Notify_My(); break;
        case XACT_EVENT_PREPARE: AtPrepare_Notify_My(); break;
        default: break;
    }
}

      
      



In the hook for executing commands on a replica for the LISTEN, UNLISTEN and NOTIFY commands, we call the corresponding functions from the modified copied async.c file :





pg_async.c
static void CheckRestrictedOperation(const char *cmdname) {
    if (InSecurityRestrictedOperation()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("cannot execute %s within security-restricted operation", cmdname)));
}

static void pg_async_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc) {
    Node *parsetree = pstmt->utilityStmt;
    if (!XactReadOnly) return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);
    check_stack_depth();
    switch (nodeTag(parsetree)) {
        case T_ListenStmt: {
            ListenStmt *stmt = (ListenStmt *)parsetree;
            CheckRestrictedOperation("LISTEN");
            Async_Listen_My(stmt->conditionname);
        } break;
        case T_NotifyStmt: {
            NotifyStmt *stmt = (NotifyStmt *)parsetree;
            Async_Notify_My(stmt->conditionname, stmt->payload);
        } break;
        case T_UnlistenStmt: {
            UnlistenStmt *stmt = (UnlistenStmt *)parsetree;
            CheckRestrictedOperation("UNLISTEN");
            stmt->conditionname ? Async_Unlisten_My(stmt->conditionname) : Async_UnlistenAll_My();
        } break;
        default: return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);
    }
    CommandCounterIncrement();
}

      
      



β†’ All this can be viewed in the repository








All Articles