ZeroMQ简介

零、消息队列简介

消息队列是一种进程间的通信机制,用于在不同进程之间同步消息。通信期间,一个进程将消息放入该队列中,然后另一个进程就可以从该队列中取出这条消息。

消息队列可以是异步的,即发送方无需等待接收方的确认或回复就可以立即执行下一步的操作。

消息队列是一种缓冲机制,即使接收方当前无法处理某个消息,该消息也不会立即丢失,而是被存储在队列中。

消息队列的通信方式减少了进程间的耦合,提高了系统的可扩展性和可维护性。

消息队列还可以用于实现分布式的任务调度和负载均衡。

一、ZeroMQ简介

ZeroMZeroMQ,简称"zmq",是一种高效、开源的消息传递框架,它提供了多种消息传递模式和编程语言支持。相比于传统的Socket网络编程,ZeroMQ提供了更高层次的抽象,使得程序员能够更专注于业务逻辑的实现而非底层网络通信。

ZeroMQ提供了多种消息传递模式,包括Request-Reply、Publish-Subscribe、Push-Pull等。这些模式可用于不同的场景,例如,Request-Reply适用于客户端与服务器之间的交互,Publish-Subscribe适用于发布-订阅模式,Push-Pull适用于任务分发和负载均衡等。

在ZeroMQ中,消息是通过Socket进行发送和接收的,ZeroMQ支持多种Socket类型。

ZeroMQ支持多种编程语言,包括C/C++、Java、Python等,这使得不同语言编写的应用程序之间可以互相通信,进而可以实现跨平台或者跨设备的数据传输。

Request-Reply模式

用于服务端和客户端的直接通信。

客户端发送请求,服务端接收请求并给出响应。

Request-Reply模式是一种基于请求和回复的模式。在这种模式下,一个节点(客户端)向另一个节点(服务端)发送一个请求,然后等待回复。这种模式通常用于需要明确的请求-响应交互的情况,例如,当你需要从服务端获取一些数据或者执行一些操作时。这种模式下,每个请求都会得到一个响应,而且请求和响应是一一对应的。

Publish-Subscribe模式

想我们订阅微信公众号那样

Push-Pull模式

ZeroMQ的Push-Pull模式,也被称为管道模式,是一种可以在分布式系统中进行负载均衡的模式。在这种模式中,生产者(Push)将消息发送到队列,而消费者(Pull)从队列中取出并处理消息。

这个模式的一个主要优点是它可以轻松地处理大量的工作负载,因为你可以简单地添加更多的消费者来处理更多的工作。

举个例子,假设我们有一个需要处理大量图片的应用程序。在这个系统中,我们可以有一个生产者节点,它将每个图片任务作为消息发送到队列。然后我们可以有多个消费者节点,每个节点从队列中拉取图片任务并处理。

当生产者生成一个新的图片任务时,它会使用PUSH套接字将任务推送到队列。ZeroMQ会自动选择一个消费者并将任务发送给它。消费者使用PULL套接字从队列中拉取任务。

如果一个消费者完成了它的当前任务并准备好处理下一个任务,它会从队列中拉取下一个任务。如果所有的消费者都在忙,新的任务将在队列中等待,直到有消费者可用。

这就是ZeroMQ的Push-Pull模式。通过使用这种模式,你可以轻松地在多个节点之间分发任务,并且可以根据需要添加更多的消费者节点。

Push-Pull模式Request-Reply模式区别

Push-Pull模式和Request-Reply模式的主要区别在于,Push-Pull模式适用于需要分发大量工作负载到多个消费者的情况,而Request-Reply模式适用于需要明确的请求-响应交互的情况。

二、ZeroMQ的特点

  1. 高性能:ZeroMQ具有出色的性能表现,能够实现高速、低延迟的消息传递。这得益于其底层使用了高效的传输协议和算法。
  2. 简单易用:ZeroMQ的API设计简洁,易于上手。它为不同的编程语言提供了丰富的绑定,使开发者能够快速地在各种环境中集成ZeroMQ。
  3. 丰富的通信模式:ZeroMQ支持多种通信模式,如发布/订阅、请求/响应、推/拉、路由等。这使得开发者可以根据实际需求灵活地组织系统的通信结构。
  4. 无中心化架构:ZeroMQ采用了去中心化的设计,无需依赖于单一的消息代理。这有助于降低系统的复杂性,提高可扩展性和容错性。
  5. 跨平台和多语言支持:ZeroMQ支持多种操作系统,如Linux、Windows和macOS等。同时,它为许多编程语言提供了绑定,如C、C++、Python、Java、Ruby等。

三、ZeroMQ的使用场景

  1. 微服务架构:在微服务架构中,ZeroMQ可用于构建高性能、松耦合的服务间通信。
  2. 实时数据传输:ZeroMQ适合实时数据传输场景,如金融交易、游戏、物联网设备通信等。
  3. 分布式计算:ZeroMQ可以用于构建分布式计算平台,实现任务分发、结果收集等功能。
  4. 日志和监控:借助ZeroMQ的发布/订阅模式,可实现实时日志收集、分析和监控。【解释:对于日志的收集和分析,我们可以创建一个或多个订阅者节点。这些节点订阅发布者节点的日志信息,一旦有新的日志发布,这些订阅者节点就会接收到日志信息。订阅者节点可以实时收集并分析这些日志,例如,它们可以监控错误日志的出现,或者统计某种类型的日志的数量。】

四、ZeroMQ架构

ZeroMQ使用了一种称为“套接字(Socket)”的抽象,将不同类型的通信模式统一为一种简单的接口。在ZeroMQ中,套接字可以承担多种角色,如发布者、订阅者、请求者、响应者等。通过将这些套接字连接起来,可以轻松地构建出各种复杂的通信模式。

ZeroMQ的核心组件包括:

  1. 套接字:ZeroMQ的基本通信单元,提供了统一的发送和接收消息的接口。
  2. 传输协议:ZeroMQ支持多种传输协议,如TCP、IPC(进程间通信)和In-Process(内部通信)等。
  3. 消息队列和缓冲区:ZeroMQ内部使用了消息队列和缓冲区来存储和管理消息。这有助于实现高效、可靠的消息传递。
  4. 设备:ZeroMQ的设备是一种特殊的实体,用于在不同的套接字之间传输消息。设备可以用来实现消息的路由、分发、聚合等功能。

ZeroMQ与其他消息队列技术的比较

  1. 与RabbitMQ的比较:RabbitMQ是一个重量级、功能丰富的消息队列代理,适用于企业级应用。相比之下,ZeroMQ更轻量级、灵活,适用于高性能、低延迟的场景。
  2. 与Kafka的比较:Kafka是一个分布式、高吞吐量的消息队列系统,主要用于实时数据流处理和大数据应用。与ZeroMQ相比,Kafka更强大、更适合大规模数据处理场景。
  3. 与NATS的比较:NATS是一个简单、高性能的发布/订阅和请求/响应消息系统。与ZeroMQ相似,NATS同样注重性能和简单性,但在通信模式和API设计上有所不同。

六、总结

ZeroMQ是一个高性能、低延迟的消息队列库,适用于构建分布式系统中的各种通信模式。其简单易用的API、丰富的通信模式、无中心化架构等特点使其在许多场景中成为理想的选择。了解ZeroMQ的特点和使用场景,将有助于您更好地评估其在您的项目中的适用性,并为您的应用程序提供强大、灵活的通信解决方案。

ZeroMQ快速上手

基本用法

1.创建zmq上下文

1
zmq::context_t context(1);

2.创建zmq通信期间的socket套接字

server端:

1
zmq::socket_t socket(context, ZMQ_REP)

client端:

1
zmq::socket_t socket(context, ZMQ_REQ);
  1. 绑定或连接到对应的socket

server端:

1
socket.bind("tcp://*:5555");

client端:

1
socket.connect("tcp://localhost:5555");
  1. 发送或接收消息
1
2
3
4
5
zmq::message_t msg(5); 
memcpy(msg.data(), "hello", 5);
socket.send(msg);
zmq::message_t reply;
socket.recv(&reply);

5.关闭socket和zmq上下文,释放资源

1
2
socket.close(); 
context.close();

常用函数接口

zmq_ctx_new:创建zmq上下文对象。

zmq_socket:创建zmq套接字对象。

zmq_bind:将套接字绑定到指定端口上。

zmq_connect:将套接字连接到指定端口上。

zmq_send:往套接字上发送消息。

zmq_recv:从套接字上接收消息。

zmq_poll:等待多个套接字上的事件。

zmq_msg_init:初始化空的zmq消息。

zmq_msg_send:往套接字上发送消息,支持更复杂的操作。

zmq_msg_recv:从套接字上接收消息,支持更复杂的操作。

1.zmq消息的构造

1
2
3
4
5
6
7
8
9
//创建空的zmq消息
zmq::message_t msg;

//给消息分配内存空间
const size_t size = 1024;
zmq::message_t msg(size);

//使用外部数据初始化消息
zmq::message_t msg("hello world!", 12);

2.发送zmq消息

1
2
3
4
5
zmq::message_t msg = ...;

auto res = sock.send(msg, zmq::send_flags::none);
auto res = sock.send(std::move(msg), zmq::send_flags::none);
auto res = sock.send(zmq::str_buffer("hello world"), zmq::send_flags::none);

3.接收zmq消息

1
2
auto res = sock.recv(msg, zmq::recv_flags::none);
auto res = sock.recv(buf, zmq::recv_flags::none);

4.设置或读取套接字属性

1
2
3
4
sock.set(zmq::sockopt::immediate, false);
sock.set(zmq::sockopt::routing_id, "100");

auto rid = sock.get(zmq::sockopt::routing_id);

5.poller轮询器操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
zmq::poller_t<> in_poller, out_poller;
//创建两个输入poller,一个输出poller
in_poller.add(input_socket1, zmq::event_flags::pollout);
in_poller.add(input_socket2, zmq::event_flags::pollout);
out_poller.add(output_socket, zmq::event_flags::pollout);

const std::chrono::milliseconds timeout{100};
std::vector<zio::poller_event<>> in_events(2);
std::vector<zio::poller_event<>> out_events(1);
while (true) {
const auto nin = in_poller.wait_all(in_events, timeout);
if (!nin) {
std::cout << "input timeout, try again" << std::endl;
continue;
}
for (int ind=0; ind<nin; ++ind) {
zmq::message_t msg;
auto rres = in_events[ind].socket.recv(msg, zmq::recv_flags::none);


const auto nout = out_poller.wait_all(out_events, timeout);
if (!nout) {
std::cout << "output timeout, freakout" << std::endl;
abort();
}
auto sres = out_events[0].socket.send(msg, zmq::send_flags::none);
}
}

Linux安装

1.zmq的Linux版本安装

下载官方发行的Linux版本zmq代码,下载完成后在本地编译生成依赖库和头文件。

下载地址1:

https://github.com/zeromq/zeromq4-1/releases

进入zmq代码目录,使用以下命令进行编译:

1
2
3
4
sh autogen.sh
./configure
make
make install

运行完以上命令,不配置自定义路径的时候,会在"/user/local/"下生成对应的so文件和头文件。

1
2
3
4
5
6
7
8
9
10
├── include
│ ├── zmq.h
│ └── zmq_utils.h
├── lib
│ ├── libzmq.a
│ ├── libzmq.la
│ ├── libzmq.so -> libzmq.so.5.0.3
│ ├── libzmq.so.5 -> libzmq.so.5.0.3
│ ├── libzmq.so.5.0.3
│ └── pkgconfig

注:"/user/local/“路径很容易被编译器找到,因此,编译的时候,只需要在gcc或g++命令后面加上”-lzmq"参数即可。

2.C语言版本的zmq集成

a.操作步骤:

完成以上安装即可。

b.引入的头文件:

1
include <zmq.h>

3.C++语言版本的zmq集成

a.操作步骤:

1.完成以上安装。

2.下载并解压官方的cppzmq压缩包,从中拷贝需要依赖的hpp头文件到之前的include目录中。

下载地址2:

https://github.com/zeromq/cppzmq/archive/master.zip

1
2
cp zmq.hpp /usr/local/include/
cp zmq_addon.hpp /usr/local/include/

b.引入的头文件:

zmq.hpp: 包含zmq消息、上下文、缓冲区、套接字、监视器、轮询器等的具体实现。

zmq_addon.hpp:zeromq库的扩展,包含更多高级功能以及另一种形式的轮询器的实现。

1
2
3
4
5
include <zmq.h>
include <zmq.hpp>

//实现复杂功能会需要zmq_addon.hpp
//#include <zmq_addon.hpp>

4.完整的集成过程

a.项目结构:

1
2
3
4
── zmq_demo
├── CMakeLists.txt
├── zmq_client.cpp
└── zmq_server.cpp

b.CMakeLists.txt配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
cmake_minimum_required(VERSION 3.5)
project(zmq_demo)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
include_directories(/user/local/include)
link_directories(/user/local/lib)

add_compile_options(-Wno-error=unused-parameter)

#Add executable
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
add_executable(zmq_server zmq_server.cpp)
add_executable(zmq_client zmq_client.cpp)

#Link zmq library
target_link_libraries(zmq_server -lzmq)
target_link_libraries(zmq_client -lzmq)

Request-Reply模式

c.服务端代码:zmq_server.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <zmq.h>
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

std::string s_recv(zmq::socket_t & socket, int flags = 0) {
zmq::message_t message;
auto recv_flags = (flags ==0)? zmq::recv_flags::none: zmq::recv_flags::dontwait;
(void)socket.recv(message, recv_flags);

return std::string(static_cast<char*>(message.data()), message.size());
}

bool s_send(zmq::socket_t & socket, const std::string & string, int flags = 0) {
zmq::message_t message(string.size());
memcpy (message.data(), string.data(), string.size());

bool rc = socket.send (message, static_cast<zmq::send_flags>(flags)).has_value();
return (rc);
}

int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
socket.bind("tcp://*:5555");

int count=0;
while (true)
{
std::string recvStr;
recvStr = s_recv(socket);
std::cout << "server Received " << recvStr << " " << count << std::endl;
sleep(1);

//Send reply back to client
std::string sendStr = "World";
s_send(socket, sendStr);
std::cout << "server Send " << sendStr << " " << count << std::endl;
count++;
}
return 0;
}

d.客户端代码:zmq_client.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <zmq.hpp>
#include <zmq.h>
#include <string>
#include <iostream>

std::string s_recv(zmq::socket_t & socket, int flags = 0) {
zmq::message_t message;
auto recv_flags = (flags ==0)? zmq::recv_flags::none: zmq::recv_flags::dontwait;
(void)socket.recv(message, recv_flags);

return std::string(static_cast<char*>(message.data()), message.size());
}

bool s_send(zmq::socket_t & socket, const std::string & string, int flags = 0) {
zmq::message_t message(string.size());
memcpy (message.data(), string.data(), string.size());

bool rc = socket.send (message, static_cast<zmq::send_flags>(flags)).has_value();
return (rc);
}

int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REQ);

std::cout << "Connecting to hello world server..." << std::endl;
socket.connect("tcp://localhost:5555");

for (int request_nbr = 0; request_nbr != 6; request_nbr++)
{
std::string sendStr = "Hello";
s_send(socket, sendStr);
std::cout << "client Send " << sendStr << " " << request_nbr << std::endl;

//Get the reply.
std::string recvStr;
recvStr = s_recv(socket);
std::cout << "client Received " << recvStr << " " << request_nbr << std::endl;
}
return 0;
}

编译过程:

1
2
3
4
5
6
7
8
Scanning dependencies of target zmq_server
[ 25%] Building CXX object CMakeFiles/zmq_server.dir/zmq_server.cpp.o
[ 50%] Linking CXX executable ../bin/zmq_server
[ 50%] Built target zmq_server
Scanning dependencies of target zmq_client
[ 75%] Building CXX object CMakeFiles/zmq_client.dir/zmq_client.cpp.o
[100%] Linking CXX executable ../bin/zmq_client
[100%] Built target zmq_client

运行结果:

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
(base) sv@sv-NF5280M5:/home/sv/pengeHome/cppzmq-master/zmq_demo/bin$ ./zmq_server
server Received Hello 0
server Send World 0
server Received Hello 1
server Send World 1
server Received Hello 2
server Send World 2
server Received Hello 3
server Send World 3
server Received Hello 4
server Send World 4
server Received Hello 5
server Send World 5

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
root@ubuntu:/home/zmq_demo/bin# ./zmq_server
server Received Hello 0
server Send World 0
server Received Hello 1
server Send World 1
server Received Hello 2
server Send World 2
server Received Hello 3
server Send World 3
server Received Hello 4
server Send World 4
server Received Hello 5
server Send World 5

发布者-订阅者模式

发布者端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <iostream>
#include <string>
#include <unistd.h>
#include <zmq.h>
#include <zmq.hpp>

int main()
{
// Create ZMQ Context
zmq::context_t context ( 1 );
// Create the Publish socket
zmq::socket_t publisher ( context, ZMQ_PUB );
// Bind to a tcp socket
publisher.bind( "tcp://*:5556" );

usleep( 1000000 );
// Message to send to the subscribers
std::string msg = "msg from [pub]";

// loop 6 times
for ( int i = 1; i <= 6; i++ )
{
// Create zmq message
zmq::message_t request( msg.length() );
// Copy contents to zmq message
memcpy( request.data(), msg.c_str(), msg.length() );
// Publish the message
publisher.send( request );
std::cout << "sending: " << i << std::endl;
}
}

订阅者端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <string>
#include <zmq.h>
#include <zmq.hpp>

int main()
{
// Create ZMQ Context
zmq::context_t context ( 1 );
// Create the Subscribe socket
zmq::socket_t subscriber ( context, ZMQ_SUB );
// Connect to a tcp socket
subscriber.connect( "tcp://localhost:5556" );
// Set the socket option to subscribe
subscriber.setsockopt( ZMQ_SUBSCRIBE, "", 0 );

// infinite loop to receive messages
for ( int i = 1; i > 0; i++ )
{
// Receive the message and convert to string
zmq::message_t update;
subscriber.recv( &update );
std::string msg = update.to_string();
// Print the message
std::cout << "Num: " << i << ", message: " << msg << std::endl;
}
}

运行结果:

发布者端:

1
2
3
4
5
6
7
root@ubuntu:/home/zmq_demo/bin# ./zmq_pub
sending: 1
sending: 2
sending: 3
sending: 4
sending: 5
sending: 6

订阅者端:

1
2
3
4
5
6
7
root@ubuntu:/home/zmq_demo/bin# ./zmq_sub
Num: 1, message: msg from [pub]
Num: 2, message: msg from [pub]
Num: 3, message: msg from [pub]
Num: 4, message: msg from [pub]
Num: 5, message: msg from [pub]
Num: 6, message: msg from [pub]