本文采用知识共享署名 4.0 国际许可协议进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,可适当缩放并在引用处附上图片所在的文章链接。
上一篇文章
介绍了ubus的组件和实现原理,本文通过代码实例介绍使用ubus进行进程间通信的三种方式。
invoke的方式实现端对端通信
最简单的情景就是一个提供服务的server端,一个请求服务的client端,client请求server的服务。
下面的例子中,server注册了一个名为“scan_prog”的对象,该对象中提供一个“scan”方法:
ubus_invoke.h:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#ifndef __UBUS_INVOKE_H__
#define __UBUS_INVOKE_H__
#include <json/json.h>
#include <libubox/blobmsg_json.h>
struct prog_attr
{
char name[64];
int chn_id;
};
#define PROG_MAX 8
#endif /* __UBUS_INVOKE_H__ */
|
invoke_server.c:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
#include <libubox/uloop.h>
#include <libubox/ustream.h>
#include <libubox/utils.h>
#include <libubus.h>
#include <json/json.h>
#include <libubox/blobmsg_json.h>
#include "ubus_invoke.h"
static struct ubus_context * ctx = NULL;
static struct blob_buf b;
static const char * sock_path;
static struct prog_attr uri_list[PROG_MAX] =
{
{"program_beijing", 1},
{"program_guangzhou", 2},
{"program_shanghai", 3},
{"", -1},
};
enum
{
SCAN_CHNID,
SCAN_POLICY_MAX,
};
static const struct blobmsg_policy scan_policy[SCAN_POLICY_MAX] = {
[SCAN_CHNID] = {.name = "chnID", .type = BLOBMSG_TYPE_INT32},
};
static int ubus_start_scan(struct ubus_context *ctx, struct ubus_object *obj,
struct ubus_request_data *req, const char *method,
struct blob_attr *msg)
{
int ret = -1;
void * json_list = NULL;
void * json_uri = NULL;
int idx;
blob_buf_init(&b, 0);
struct blob_attr *tb[SCAN_POLICY_MAX];
blobmsg_parse(scan_policy, SCAN_POLICY_MAX, tb, blob_data(msg), blob_len(msg));
/*
本例子中,如果请求特定的节目号,返回节目名称。
如果请求节目号是0,则返回所有节目列表。
*/
if (tb[SCAN_CHNID] != NULL)
{
int chnid = blobmsg_get_u32(tb[SCAN_CHNID]);
if (chnid == 0)
{
json_uri = blobmsg_open_array(&b, "prog_list");
for (idx = 0; idx < PROG_MAX; idx++)
{
if ('\0' != uri_list[idx].name[0])
{
json_list = blobmsg_open_table(&b, NULL);
blobmsg_add_string(&b, "name", uri_list[idx].name);
blobmsg_add_u32(&b, "channel", uri_list[idx].chn_id);
blobmsg_close_table(&b, json_list);
}
}
blobmsg_close_array(&b, json_uri);
ret = 0;
}
else
{
for (idx = 0; idx < PROG_MAX; idx++)
{
if ('\0' != uri_list[idx].name[0] && uri_list[idx].chn_id == chnid)
{
blobmsg_add_string(&b, "name", uri_list[idx].name);
ret = 0;
}
}
}
}
blobmsg_add_u32(&b, "result", ret);
ubus_send_reply(ctx, req, b.head);
return 0;
}
/* 方法列表 */
static struct ubus_method scan_methods[] =
{
UBUS_METHOD("scan", ubus_start_scan, scan_policy),
};
/* type目前没有实际用处 */
static struct ubus_object_type scan_obj_type
= UBUS_OBJECT_TYPE("scan_prog", scan_methods);
static struct ubus_object scan_obj =
{
.name = "scan_prog", /* 对象的名字 */
.type = &scan_obj_type,
.methods = scan_methods,
.n_methods = ARRAY_SIZE(scan_methods),
};
static void ubus_add_fd(void)
{
ubus_add_uloop(ctx);
#ifdef FD_CLOEXEC
fcntl(ctx->sock.fd, F_SETFD,
fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC);
#endif
}
static void ubus_reconn_timer(struct uloop_timeout *timeout)
{
static struct uloop_timeout retry =
{
.cb = ubus_reconn_timer,
};
int t = 2;
if (ubus_reconnect(ctx, sock_path) != 0) {
uloop_timeout_set(&retry, t * 1000);
return;
}
ubus_add_fd();
}
static void ubus_connection_lost(struct ubus_context *ctx)
{
ubus_reconn_timer(NULL);
}
static int display_ubus_init(const char *path)
{
uloop_init();
sock_path = path;
ctx = ubus_connect(path);
if (!ctx)
{
printf("ubus connect failed\n");
return -1;
}
printf("connected as %08x\n", ctx->local_id);
ctx->connection_lost = ubus_connection_lost;
ubus_add_fd();
/* 向ubusd注册对象和方法,供其他ubus客户端调用。 */
if (ubus_add_object(ctx, &scan_obj) != 0)
{
printf("ubus add obj failed\n");
return -1;
}
return 0;
}
static void display_ubus_done(void)
{
if (ctx)
ubus_free(ctx);
}
int main(int argc, char * argv[])
{
char * path = NULL;
if (-1 == display_ubus_init(path))
{
printf("ubus connect failed!\n");
return -1;
}
uloop_run();
display_ubus_done();
return 0;
}
|
客户端代码invoke_client.c去调用server端的"scan_prog"对象的"scan"方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
#include <libubox/uloop.h>
#include <libubox/ustream.h>
#include <libubox/utils.h>
#include <libubus.h>
#include <json/json.h>
#include <libubox/blobmsg_json.h>
#include "ubus_invoke.h"
static struct ubus_context * ctx = NULL;
static struct blob_buf b;
static const char * cli_path;
enum
{
SCAN_CHNID,
SCAN_POLICY_MAX,
};
static const struct blobmsg_policy scan_policy[SCAN_POLICY_MAX] = {
[SCAN_CHNID] = {.name = "chnID", .type = BLOBMSG_TYPE_INT32},
};
static int timeout = 30;
static bool simple_output = false;
static void scanreq_prog_cb(struct ubus_request *req, int type, struct blob_attr *msg)
{
char *str;
if (!msg)
return;
/*
在这里处理返回的消息。
本例子只是将返回的消息打印出来。
*/
str = blobmsg_format_json_indent(msg, true, simple_output ? -1 : 0);
printf("%s\n", str);
free(str);
}
static int client_ubus_call()
{
unsigned int id;
int ret;
blob_buf_init(&b, 0);
/* 需要传递的参数 */
blobmsg_add_u32(&b, scan_policy[SCAN_CHNID].name, 0);
/*
向ubusd查询是否存在"scan_prog"这个对象,
如果存在,返回其id
*/
ret = ubus_lookup_id(ctx, "scan_prog", &id);
if (ret != UBUS_STATUS_OK) {
printf("lookup scan_prog failed\n");
return ret;
}
else {
printf("lookup scan_prog successs\n");
}
/* 调用"scan_prog"对象的"scan"方法 */
return ubus_invoke(ctx, id, "scan", b.head, scanreq_prog_cb, NULL, timeout * 1000);
}
/*
ubus_invoke()的声明如下:
int ubus_invoke(struct ubus_context *ctx, uint32_t obj, const char *method,
struct blob_attr *msg, ubus_data_handler_t cb, void *priv, int timeout);
其中cb参数是消息回调函数,其函数类型定义为:
typedef void (*ubus_data_handler_t)(struct ubus_request *req,
int type, struct blob_attr *msg);
参数type是请求消息的类型,如UBUS_MSG_INVOKE,UBUS_MSG_DATA等。
参数msg存放从服务端得到的回复消息,struct blob_attr类型,同样用blobmsg_parse()来解析。
参数req保存了请求方的消息属性,其中req->priv即ubus_invoke()中的priv参数,
用这个参数可以零活的传递一些额外的数据。
*/
static int client_ubus_init(const char *path)
{
uloop_init();
cli_path = path;
ctx = ubus_connect(path);
if (!ctx)
{
printf("ubus connect failed\n");
return -1;
}
printf("connected as %08x\n", ctx->local_id);
return 0;
}
static void client_ubus_done(void)
{
if (ctx)
ubus_free(ctx);
}
int main(int argc, char * argv[])
{
/* ubusd创建的unix socket,默认值为"/var/run/ubus.sock" */
char * path = NULL;
/* 连接ubusd */
if (UBUS_STATUS_OK != client_ubus_init(path))
{
printf("ubus connect failed!\n");
return -1;
}
/* 调用某个ubus方法并处理返回结果 */
client_ubus_call();
client_ubus_done();
return 0;
}
|
先执行server程序,再执行client程序,可以看到client发出请求后,
server返回了相应的节目号,在client打印出了接收到的消息。
也可以通过shell命令来请求server的服务,例如:
ubus call scan_prog scan '{"chnID": 0}'
这条命令和执行client程序的作用相同。
subscribe/notify的方式实现订阅
ubus支持以订阅的方式进行进程间通信,通信方式如下图:

被订阅者(server)又称为notifier,订阅者(client)又称为subscriber。
这样一来,可以同时有多个client订阅server的某个服务,当server通过该服务广播消息时,
所有client都会被通知,并执行各自的处理。
主要过程为:
server进程:
- 连接ubusd。
- 注册一个object,用于client订阅。
- server可以向订阅者广播消息。
- 等待消息。
client进程:
- 连接ubusd。
- 向server订阅object,并定义收到server的消息时的处理函数。
- 等待消息。
代码示例:下面这个例子中,server注册了一个名为“test”的对象,并定期广播消息。
而client去订阅这个对象,并对server发出的消息做处理。
notify_server.c:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
#include <unistd.h>
#include <libubox/blobmsg_json.h>
#include <libubox/uloop.h>
#include <libubus.h>
static struct ubus_context *ctx;
static void test_client_subscribe_cb(struct ubus_context *ctx, struct ubus_object *obj)
{
fprintf(stderr, "Subscribers active: %d\n", obj->has_subscribers);
}
/* 这个空的method列表,只是为了让object有个名字,这样client可以通过object name来订阅。 */
static struct ubus_method test_methods[] = {};
static struct ubus_object_type test_obj_type =
UBUS_OBJECT_TYPE("test", test_methods);
static struct ubus_object test_object = {
.name = "test", /* object的名字 */
.type = &test_obj_type,
.subscribe_cb = test_client_subscribe_cb,
};
static void notifier_main(void)
{
int ret;
/* 注册一个object,client可以订阅这个object */
ret = ubus_add_object(ctx, &test_object);
if (ret) {
fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret));
return;
}
/* 在需要的时候,向所有客户端发送notify消息 */
/* step1: 如果需要传递参数,则保存到struct blob_attr类型的结构体中。 */
/*
int ubus_notify(struct ubus_context *ctx, struct ubus_object *obj, const char *type, struct blob_attr *msg, int timeout);
type是一个字符串,自定义的。msg是需要携带的参数。如果需要等待回复,timeout需设置为>=0。
*/
while (1) {
sleep(2);
/* step2: 广播notification消息。 */
ubus_notify(ctx, &test_object, "say Hi!", NULL, -1);
}
}
int main(int argc, char **argv)
{
const char *ubus_socket = NULL;
uloop_init();
ctx = ubus_connect(ubus_socket);
if (!ctx) {
fprintf(stderr, "Failed to connect to ubus\n");
return -1;
}
ubus_add_uloop(ctx);
notifier_main();
uloop_run();
ubus_free(ctx);
uloop_done();
return 0;
}
|
notify_client.c:客户端订阅“test”对象,在收到3次消息后,随即取消对“test”对象的订阅。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
#include <unistd.h>
#include <libubox/blobmsg_json.h>
#include <libubox/uloop.h>
#include <libubus.h>
static struct ubus_context *ctx;
static int counter = 0;
static uint32_t obj_id;
static struct ubus_subscriber test_event;
static int test_notify(struct ubus_context *ctx, struct ubus_object *obj,
struct ubus_request_data *req,
const char *method, struct blob_attr *msg)
{
printf("notify handler...\n");
counter++;
if (counter > 3)
ubus_unsubscribe(ctx, &test_event, obj_id); /* 取消订阅 */
return 0;
}
static void test_handle_remove(struct ubus_context *ctx,
struct ubus_subscriber *obj, uint32_t id)
{
printf("remove handler...\n");
}
static void subscriber_main(void)
{
int ret;
/* 通知到来时的处理函数。 */
test_event.cb = test_notify;
test_event.remove_cb = test_handle_remove; //server主动发起删除该client的订阅的cb函数(如server退出的时候)
/* 注册test_event */
ret = ubus_register_subscriber(ctx, &test_event);
if (ret)
fprintf(stderr, "Failed to add watch handler: %s\n", ubus_strerror(ret));
/* 得到要订阅的object的id */
ret = ubus_lookup_id(ctx, "test", &obj_id);
if (ret)
fprintf(stderr, "Failed to lookup object: %s\n", ubus_strerror(ret));
/* 订阅object */
ret = ubus_subscribe(ctx, &test_event, obj_id);
if (ret)
fprintf(stderr, "Failed to subscribe: %s\n", ubus_strerror(ret));
}
int main(int argc, char **argv)
{
const char *ubus_socket = NULL;
uloop_init();
ctx = ubus_connect(ubus_socket);
if (!ctx) {
fprintf(stderr, "Failed to connect to ubus\n");
return -1;
}
ubus_add_uloop(ctx);
subscriber_main();
uloop_run();
ubus_free(ctx);
uloop_done();
return 0;
}
|
先运行server&,注册可订阅的对象“test”,并随即每2秒向外广播通知消息。这时还没有client订阅这个对象。
运行多个client程序,由于每个client都订阅了“test”对象,则所有client都会收到server发出的消息。
当client取消订阅后,则不再收到server端的消息。
event的方式实现事件通知
event机制从一对一的进程之间通信来讲,和invoke机制类似。不过event机制中,
发送方不需要知道谁要接收这个消息,实际上就是一个广播消息。这类似于U盘的热插拔:当插上或拔出U盘时,
内核会广播一个NETLINK事件,之后内核继续做自己的事情,而不关心谁会去监听和处理这个事件。
下面的例子中,client端同时监听了“add_device”和“rm_device”两个事件,而server端会触发“add_device”
事件并携带device的一些信息发送出去。
event_client.c:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
#include <libubox/uloop.h>
#include <libubox/ustream.h>
#include <libubox/utils.h>
#include <libubus.h>
#include <json/json.h>
#include <libubox/blobmsg_json.h>
static struct ubus_context * ctx = NULL;
static const char * cli_path;
#define UBUS_EVENT_ADD_DEVICE "add_device"
#define UBUS_EVENT_REMOVE_DEVICE "rm_device"
static void ubus_probe_device_event(struct ubus_context *ctx, struct ubus_event_handler *ev,
const char *type, struct blob_attr *msg)
{
char *str;
if (!msg)
return;
/*
在这里实现收到事件后的动作。
event也可以传递消息,放在msg中。
本例子只是将返回的消息打印出来。
*/
str = blobmsg_format_json(msg, true);
printf("{ \"%s\": %s }\n", type, str);
free(str);
}
static int client_ubus_register_events()
{
static struct ubus_event_handler listener;
int ret = 0;
/* 注册特定event的listener。多个event可以使用同一个listener */
memset(&listener, 0, sizeof(listener));
listener.cb = ubus_probe_device_event;
ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_ADD_DEVICE);
if (ret)
{
fprintf(stderr, "register event failed.\n");
return -1;
}
ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_REMOVE_DEVICE);
if (ret)
{
ubus_unregister_event_handler(ctx, &listener);
fprintf(stderr, "register event failed.\n");
return -1;
}
return 0;
}
static void ubus_add_fd(void)
{
ubus_add_uloop(ctx);
#ifdef FD_CLOEXEC
fcntl(ctx->sock.fd, F_SETFD,
fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC);
#endif
}
static void ubus_reconn_timer(struct uloop_timeout *timeout)
{
static struct uloop_timeout retry =
{
.cb = ubus_reconn_timer,
};
int t = 2;
if (ubus_reconnect(ctx, cli_path) != 0) {
uloop_timeout_set(&retry, t * 1000);
return;
}
ubus_add_fd();
}
static void ubus_connection_lost(struct ubus_context *ctx)
{
ubus_reconn_timer(NULL);
}
static int client_ubus_init(const char *path)
{
uloop_init();
cli_path = path;
ctx = ubus_connect(path);
if (!ctx)
{
printf("ubus connect failed\n");
return -1;
}
printf("connected as %08x\n", ctx->local_id);
ctx->connection_lost = ubus_connection_lost;
ubus_add_fd();
return 0;
}
static void client_ubus_done(void)
{
if (ctx)
ubus_free(ctx);
}
int main(int argc, char * argv[])
{
char * path = NULL;
/* 连接ubusd */
if (UBUS_STATUS_OK != client_ubus_init(path))
{
printf("ubus connect failed!\n");
return -1;
}
/* 注册某个事件的处理函数 */
client_ubus_register_events();
uloop_run();
client_ubus_done();
return 0;
}
|
event_server.c:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
#include <libubox/uloop.h>
#include <libubox/ustream.h>
#include <libubox/utils.h>
#include <libubus.h>
#include <json/json.h>
#include <libubox/blobmsg_json.h>
static struct ubus_context * ctx = NULL;
static struct blob_buf b;
static const char * sock_path;
static int server_ubus_send_event(void)
{
blob_buf_init(&b, 0);
/* 需要传递的参数 */
blobmsg_add_u32(&b, "major", 3);
blobmsg_add_u32(&b, "minor", 56);
blobmsg_add_string(&b, "name", "mmc01");
/* 广播名为"add_device"的事件 */
return ubus_send_event(ctx, "add_device", b.head);
}
static int display_ubus_init(const char *path)
{
uloop_init();
sock_path = path;
ctx = ubus_connect(path);
if (!ctx)
{
printf("ubus connect failed\n");
return -1;
}
printf("connected as %08x\n", ctx->local_id);
return 0;
}
static void display_ubus_done(void)
{
if (ctx)
ubus_free(ctx);
}
int main(int argc, char * argv[])
{
char * path = NULL;
if (-1 == display_ubus_init(path))
{
printf("ubus connect failed!\n");
return -1;
}
server_ubus_send_event();
display_ubus_done();
return 0;
}
|
先运行client &注册事件。我们同时启动两个client程序。
再执行server主动触发"add_device"事件,则凡是注册了这个事件的client都会收到该事件并执行各自的处理。
1
2
3
4
|
root@NVR:~# ./server
connected as fdecbdc1
{ "add_device": { "major": 3, "minor": 56, "name": "mmc01" } }
{ "add_device": { "major": 3, "minor": 56, "name": "mmc01" } }
|
也可以通过命令行的ubus send命令触发事件:
1
2
3
|
root@NVR:~# ubus send "rm_device" '{ "major": 3, "minor": 56, "name": "mmc01" }'
{ "rm_device": { "major": 3, "minor": 56, "name": "mmc01" } }
{ "rm_device": { "major": 3, "minor": 56, "name": "mmc01" } }
|
在使用ubus时,可根据实际的场景来选择用哪种方式进行进程间通信。如之前所说,
ubus是为发送消息而设计的,不合适传输大量数据。