Python利用非阻塞式UDP服务实现进程间通讯

发布于 / Python / Comments Off on Python利用非阻塞式UDP服务实现进程间通讯

前段时间有个需求,项目上有两个不同的进程,一个是API接口进程,一个是内部服务的进程。两个进程共用一个白名单数据库,内部服务进程要处理大量的数据(数亿级),每次处理一行数据都要检测一下是否在白名单内,因此要做一个白名单缓存来加速运行。为了实现白名单同步,要求当API接收到修改白名单请求的时候,通知一下内部服务进程,刷新白名单缓存。

直接上代码:

import socket
from threading import Thread
from time import sleep

# 当前存在问题:
# 1,UDP无法保证数据包交付
# 2,UDP监听失败不能重试


class Client:
    def __init__(self, port=8742):
        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.host, self.port = "127.0.0.1", port

    def send_notify(self, msg):
        """
        发送通知
        :param msg:通知内容
        :return:
        """
        self.serv.sendto(msg.encode(), (self.host, self.port))


class Server:
    def __init__(self, callback, port=8742):
        self.host, self.port = "127.0.0.1", port
        self.thread_stop_sign = False
        self.callback = callback

    def server_handle(self, callback):
        """
        UDP处理函数,收到UDP报文时会调callback函数
        :param callback:回调函数
        :return:
        """
        serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        serv.bind((self.host, self.port))
        while True:
            message, client_address = serv.recvfrom(2048)
            message = message.decode()
            if message == "stop" and self.thread_stop_sign:
                self.thread_stop_sign = False
                serv.close()
                return
            callback(message)

    def start(self):
        """
        守护线程启动
        :return:线程成功启动,返回True,反之返回false
        """
        self.thread = Thread(target=self.server_handle, args=(self.callback,))
        self.thread.setDaemon(True)
        self.thread.start()

    def stop(self):
        """
        守护线程启动
        :return:线程关闭
        """
        self.thread_stop_sign = True
        serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        serv.sendto(b"stop", (self.host, self.port))

这样,在内部服务进程开一个UDP服务器,并设置好回调函数

def func(s):
    func_table = {
        "aaa": aaa,
        "bbb": bbb,
        ...    # 这里是个函数表,接受到某个字段就执行某个函数
    }
    if s in func_table.keys():
        func_table[s]()

server = Server()
server.start(callback = func)

然后在api接口的相关地方开一个Client

client = Client()

即可进行通讯

client.send_notify("aaa")  # 通知server执行aaa()

这里因为仅仅是简单的刷新白名单的需求,没有做队列,如有需要,可以修改server的代码,使用python的queue实现一个消息队列。

转载原创文章请注明,转载自: 斐斐のBlog » Python利用非阻塞式UDP服务实现进程间通讯
评论已关闭