昨天聊过,依托reactive-pg-client可以做很多传统JDBC无法实现的事情,比如PostgreSQL的消息推送(notifylisten)。有了这种功能,我们就可以轻易实现从数据库层主向业务逻辑代码推送消息的功能。可以说,又一次为我们打开了新世界的大门。

先来回顾下PostgreSQLnotifylisten

主要参考官方文档,其实非常简单,核心SQL就两句话:

  • 发消息:
    NOTIFY channel_name;
    NOTIFY是一个关键字,后面跟着的第一参数是,频道的名字,这个是用户随便定义的,只要之后跟LISTEN的保持一致即可。NOTIFY还有第二个可选参数,就是消息内容,类型也必须是字符串,并且长度限制在8000字节。
    NOTIFY channel_name message_body还有一种等效的写法,也就是
    SELECT pg_notify(channel_name,message_body)
    后者的好处是可以用到SQL的预编译特性。
  • 收消息:
    LISTEN channel_name;

这里补充几个知识点:

  • NOTIFY/LISTEN这些语法不是SQL 标准,属于PostgreSQL特有的功能
  • LISTEN相反的,有UNLISTEN可供使用
  • NOTIFY是广播模式,也就是所有同频道的LISTEN都能接收到信息
  • NOTIFY,再LISTEN是没有效果的。这个也符合直觉。
  • 想测试的话,最好借助PostgreSQL自带的psql环境,可以很容易测试,其他SQL客户端程序可能就没那么友好了。

有了PostgreSQL的基础知识储备,我们就可以用reactive-pg-client尝试一把了

直接上代码

pgClient.getConnection {
def conn = it.result()

conn.notificationHandler({ notification ->
println("Received ${notification.payload} on channel ${notification.channel}")
context.assertEquals(notification.payload, message)
context.assertEquals(notification.channel, channelName)
async.complete()
})

conn.preparedQuery("LISTEN $channelName", { ar ->
println("Subscribed to channel")

conn.preparedQuery('''select pg_notify($1,$2)''', Tuple.of(channelName, message), {})
})
}

没什么复杂的,通过reactive-pg-client拿到connection之后,先用这个connection注册个notificationHandler用来接收消息。但是此时还不够,还必须在同connection上执行LISTENSQL语句,才能保证之前的notificationHandler是有效的。至于NOTIFY语句并不要求非要用这个connection,这也是符合我们业务需求的。
以上完整代码,可以在我的github中找到。
上面我是在当前代码中,手动NOTIFY的,我们完全可以吧NOTIFY放在数据库的触发器、或者数据库定时任务中,就可以实现数据库到应用程序的数据推送了。另外经测试,NOTIFYLISTEN两个语法在JDBC环境下也是可以执行成功的(SQL不会报错),只是LISTEN不会有任何效果就是了。

明天,我将演示基于reactive-pg-client的数据库事务demo。