基于gdbus的p2p机能做成的类Windows的message机制
Charles Chan @ 2016-06-12 #GDBus #IPC #P2P @Program
Contents:
以下代码绝大部分来自于GLIB官方例程。
/*
For dbus-p2p-ipc-test :
$CC dbus-p2p-ipc.c -g -O0 -D_TEST -o dbus-p2p-ipc-test \
`pkg-config gio-2.0 --libs --cflags`
For libdbus-p2p-ipc.so :
$CC dbus-p2p-ipc.c -g -O0 -shared -fPIC -o libdbus-p2p-ipc.so \
`pkg-config gio-2.0 --libs --cflags`
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <sys/prctl.h>
#include <gio/gio.h>
#ifdef _TEST
#define LOGD printf
#else
#define LOGD
#endif
#define LOGF(fmt,...) printf("%s "fmt, __FUNCTION__, ##__VA_ARGS__)
#define LOGE printf
#define CHECK_IF_FAIL(c) \
{\
if(!(c)) {\
LOGE("%s(%d): warning '%s' fail\n", __FILE__, __LINE__, #c);\
return;\
}\
}
#define CHECK_IF_FAIL2(c, r) \
{\
if(!(c)) {\
LOGE("%s(%d): warning '%s' fail\n", __FILE__, __LINE__, #c);\
return r;\
}\
}
typedef enum {
MSG_SEND_SUCCESS, /**< a message sent success */
MSG_SEND_FAIL, /**< a message sent fail */
MSG_ERR_PARAM, /**< parameter wrong */
MSG_ERR_CONNECT, /**< error occur when connecting to the other side */
MSG_ERROR /**< something wrong in the system which we not sure */
}my_ipc_result_t;
typedef struct _my_ipc_t my_ipc_t;
typedef void (*my_ipc_receive_msg_cb_t)(
my_ipc_t* ipc,
int id, const char* body, int body_len, void *user);
typedef void (*my_ipc_client_cb_t)(my_ipc_t* ipc, void *user);
typedef struct _my_ipc_t_{
char *label;
GDBusConnection *connection;
GDBusMethodInvocation *invocation;
GDBusNodeInfo *introspection_data;
gboolean is_server_side;
pthread_t tid;
GDBusServer *server;
GMainLoop *loop;
struct _my_ipc_t_ *parent;
GHashTable* hashmap;
my_ipc_receive_msg_cb_t rcv_cb;
my_ipc_client_cb_t clnt_cnnt;
my_ipc_client_cb_t clnt_discnnt;
void *user;
}_my_ipc_t;
/* -------------------------------------------------------------------------- */
/* Introspection data for the service we are exporting */
static const gchar introspection_xml[] =
"<node>"
" <interface name='com.zeerd.example.ipc'>"
" <method name='post'>"
" <arg type='i' name='id' direction='in'/>"
" <arg type='s' name='body' direction='in'/>"
" <arg type='s' name='result' direction='out'/>"
" </method>"
" <method name='send'>"
" <arg type='i' name='id' direction='in'/>"
" <arg type='s' name='body' direction='in'/>"
" <arg type='s' name='result' direction='out'/>"
" </method>"
" <signal name='signal'>"
" <arg type='i' name='id'/>"
" <arg type='s' name='body'/>"
" </signal>"
" </interface>"
"</node>";
/* -------------------------------------------------------------------------- */
static void
handle_method_call (GDBusConnection *connection,
const gchar *sender,
const gchar *object_path,
const gchar *interface_name,
const gchar *method_name,
GVariant *parameters,
GDBusMethodInvocation *invocation,
gpointer user_data)
{
LOGF("IN %p %s %s %s %s\n",
connection, sender, object_path, interface_name, method_name);
CHECK_IF_FAIL(user_data != NULL);
_my_ipc_t *_ipc = (_my_ipc_t*)user_data;
GError *error = NULL;
pid_t pid = -1;
_my_ipc_t *_sender = NULL;
GCredentials *credentials
= g_dbus_connection_get_peer_credentials(connection);
if(credentials != NULL) {
pid = g_credentials_get_unix_pid (credentials, &error);
if(error != NULL) {
LOGE("Error getting sender's pid : %s\n", error->message);
g_error_free (error);
}
else {
_sender = (_my_ipc_t*)g_hash_table_lookup(
_ipc->hashmap, (gpointer)(pid));
}
}
LOGD("sender's pid = %d\n", pid);
if (g_strcmp0 (method_name, "post") == 0) {
gint id;
const gchar *body;
g_variant_get (parameters, "(i&s)", &id, &body);
g_dbus_method_invocation_return_value (
invocation, g_variant_new ("(s)", ""));
_ipc->rcv_cb((my_ipc_t*)_sender, id, body, strlen(body), _ipc->user);
LOGD ("post : id = '%d', body = '%s'.\n", id, body);
}
else if (g_strcmp0 (method_name, "send") == 0) {
gint id;
const gchar *body;
g_variant_get (parameters, "(i&s)", &id, &body);
_my_ipc_t *_new = (_my_ipc_t*)g_malloc0(sizeof(_my_ipc_t));
if(_new != NULL && _sender != NULL) {
LOGD("new send client ipc created : %p\n", _new);
_new->is_server_side = _sender->is_server_side;
_new->connection = _sender->connection;
_new->invocation = invocation;
_ipc->rcv_cb((my_ipc_t*)_new, id, body, strlen(body), _ipc->user);
}
LOGD ("send : id = '%d', body = '%s'.\n", id, body);
}
}
static const GDBusInterfaceVTable interface_vtable =
{
handle_method_call,
NULL,
NULL,
};
/* -------------------------------------------------------------------------- */
static gboolean
on_closed (GDBusConnection * connection,
gboolean remote_peer_vanished,
GError * error,
gpointer user_data)
{
CHECK_IF_FAIL2(user_data != NULL, FALSE);
_my_ipc_t *_ipc = (_my_ipc_t*)user_data;
_ipc->parent->clnt_discnnt((my_ipc_t*)_ipc, _ipc->parent->user);
return TRUE;
}
static gboolean
on_new_connection (GDBusServer *server,
GDBusConnection *connection,
gpointer user_data)
{
guint registration_id;
GCredentials *credentials;
gchar *s;
pid_t pid = -1;
CHECK_IF_FAIL2(user_data != NULL, FALSE);
_my_ipc_t *_svr = (_my_ipc_t*)user_data;
credentials = g_dbus_connection_get_peer_credentials (connection);
if (credentials == NULL)
s = g_strdup ("(no credentials received)");
else {
s = g_credentials_to_string (credentials);
GError *error = NULL;
pid = g_credentials_get_unix_pid (credentials, &error);
if(error != NULL) {
LOGE("Error getting connector's pid : %s\n", error->message);
g_error_free (error);
}
}
LOGD ("Client connected.\n"
"Peer credentials: %s\n"
"Negotiated capabilities: unix-fd-passing=%d\n"
"Unique name: %s\n",
s,
g_dbus_connection_get_capabilities (
connection) & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING,
g_dbus_connection_get_unique_name(connection));
_my_ipc_t *_ipc = (_my_ipc_t*)g_malloc0(sizeof(_my_ipc_t));
if(_ipc != NULL) {
LOGD("new client ipc created : %p\n", _ipc);
_ipc->is_server_side = TRUE;
_ipc->connection = connection;
_ipc->parent = _svr;
g_signal_connect (_ipc->connection,
"closed",
G_CALLBACK (on_closed),
_ipc);
if(pid > 0) {
g_hash_table_insert(_svr->hashmap, (gpointer)(pid), _ipc);
}
_svr->clnt_cnnt((my_ipc_t*)_ipc, _svr->user);
}
g_object_ref (connection);
registration_id = g_dbus_connection_register_object (connection,
"/com/zeerd/example/object",
_svr->introspection_data->interfaces[0],
&interface_vtable,
_svr, /* user_data */
NULL, /* user_data_free_func */
NULL); /* GError** */
g_assert (registration_id > 0);
return TRUE;
}
/* -------------------------------------------------------------------------- */
static void _init(_my_ipc_t *_ipc)
{
/* We are lazy here - we don't want to manually provide
* the introspection data structures - so we just build
* them from XML.
*/
_ipc->introspection_data
= g_dbus_node_info_new_for_xml (introspection_xml, NULL);
}
static void signalCallback (GDBusConnection *connection,
const gchar *sender_name,
const gchar *object_path,
const gchar *interface_name,
const gchar *signal_name,
GVariant *parameters,
gpointer user_data)
{
LOGF("IN\n");
CHECK_IF_FAIL(user_data != NULL);
_my_ipc_t *_ipc = (_my_ipc_t*)user_data;
gint id;
const gchar *body;
g_variant_get (parameters, "(i&s)", &id, &body);
_ipc->rcv_cb((my_ipc_t*)_ipc, id, body, strlen(body), _ipc->user);
LOGD ("signal : id = '%d', body = '%s'.\n", id, body);
LOGF("\n");
}
static void *ipc_client_loop(void *arg)
{
LOGF("IN\n");
_my_ipc_t *_ipc = (_my_ipc_t*)arg;
_ipc->loop = g_main_loop_new (NULL, FALSE);
g_main_loop_run (_ipc->loop);
g_object_unref (_ipc->connection);
g_dbus_node_info_unref (_ipc->introspection_data);
g_free(_ipc->label);
g_free(_ipc);
LOGF("OUT\n");
return (gpointer)0;
}
my_ipc_t* my_ipc_create_client(
const char* target_label,
my_ipc_receive_msg_cb_t cb, const gpointer user,
my_ipc_result_t *error)
{
LOGF("IN\n");
if(error != NULL) {*error = MSG_SEND_SUCCESS;}
_my_ipc_t *_ipc = (_my_ipc_t*)g_malloc0(sizeof(_my_ipc_t));
if(_ipc != NULL) {
LOGD("client ipc created : %p\n", _ipc);
_init(_ipc);
GError *err = NULL;
_ipc->is_server_side = FALSE;
_ipc->label = g_strdup_printf("unix:abstract=%s", target_label);
_ipc->rcv_cb = cb;
_ipc->user = (gpointer)user;
_ipc->connection = g_dbus_connection_new_for_address_sync (
_ipc->label,
G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT,
NULL, /* GDBusAuthObserver */
NULL, /* GCancellable */
&err);
if (_ipc->connection != NULL) {
LOGD("Connected.\n"
"Negotiated capabilities: unix-fd-passing=%d\n",
g_dbus_connection_get_capabilities (
_ipc->connection)
& G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING);
guint subid = g_dbus_connection_signal_subscribe (
_ipc->connection,
NULL,
"com.zeerd.example.ipc",
NULL,
"/com/zeerd/example/object",
NULL,
G_DBUS_SIGNAL_FLAGS_NO_MATCH_RULE,
signalCallback,
(gpointer)_ipc,
NULL);
LOGD("g_dbus_connection_signal_subscribe : %d\n", subid);
pthread_create(&(_ipc->tid), NULL, ipc_client_loop, (gpointer)_ipc);
}
else {
LOGE("Error connecting to D-Bus address %s: %s\n",
_ipc->label, err->message);
g_error_free (err);
if(error != NULL) {*error = MSG_ERR_CONNECT;}
}
}
else {
if(error != NULL) {*error = MSG_ERROR;}
}
LOGF("OUT\n");
return (my_ipc_t*)_ipc;
}
int my_ipc_client_destory(my_ipc_t *ipc)
{
LOGF("IN\n");
_my_ipc_t *_ipc = (_my_ipc_t*)ipc;
CHECK_IF_FAIL2(ipc != NULL, -1);
if(_ipc->loop != NULL) {
g_main_loop_unref (_ipc->loop);
}
LOGF("OUT\n");
return 0;
}
static void *ipc_server_loop(void *arg)
{
LOGF("IN\n");
_my_ipc_t *_ipc = (_my_ipc_t*)arg;
_init(_ipc);
gchar *guid;
GDBusServerFlags server_flags;
guid = g_dbus_generate_guid ();
server_flags = G_DBUS_SERVER_FLAGS_NONE;
GError *error = NULL;
_ipc->server = g_dbus_server_new_sync (
_ipc->label,
server_flags,
guid,
NULL, /* GDBusAuthObserver */
NULL, /* GCancellable */
&error);
g_dbus_server_start (_ipc->server);
g_free (guid);
if (_ipc->server != NULL) {
LOGD("Server is listening at: %s\n",
g_dbus_server_get_client_address (_ipc->server));
g_signal_connect (_ipc->server,
"new-connection",
G_CALLBACK (on_new_connection),
_ipc);
_ipc->loop = g_main_loop_new (NULL, FALSE);
g_main_loop_run (_ipc->loop);
g_object_unref (_ipc->server);
g_main_loop_unref (_ipc->loop);
}
else {
LOGE("Error creating server at address %s: %s\n",
_ipc->label, error->message);
g_error_free (error);
}
g_hash_table_remove_all(_ipc->hashmap);
g_dbus_node_info_unref (_ipc->introspection_data);
g_free(_ipc->label);
g_free(_ipc);
LOGF("OUT\n");
return (gpointer)0;
}
my_ipc_t* my_ipc_create_server(
const char* self_label,
my_ipc_receive_msg_cb_t rcv,
my_ipc_client_cb_t clnt_cnnt,
my_ipc_client_cb_t clnt_discnnt,
const gpointer user,
my_ipc_result_t *error)
{
LOGF("IN\n");
if(error != NULL) {*error = MSG_SEND_SUCCESS;}
_my_ipc_t *_ipc = (_my_ipc_t*)g_malloc0(sizeof(_my_ipc_t));
if(_ipc != NULL) {
LOGD("server ipc created : %p\n", _ipc);
_ipc->is_server_side = TRUE;
_ipc->rcv_cb = rcv;
_ipc->clnt_cnnt = clnt_cnnt;
_ipc->clnt_discnnt = clnt_discnnt;
_ipc->user = (gpointer)user;
_ipc->label = g_strdup_printf("unix:abstract=%s", self_label);
_ipc->hashmap = g_hash_table_new(g_direct_hash, g_direct_equal);
pthread_create(&(_ipc->tid), NULL, ipc_server_loop, (gpointer)_ipc);
while(_ipc->loop != NULL && !g_main_loop_is_running(_ipc->loop)) {
usleep(10);
}
}
else {
if(error != NULL) {*error = MSG_ERROR;}
}
LOGF("OUT\n");
return (my_ipc_t*)_ipc;
}
int my_ipc_service_destory(my_ipc_t *ipc)
{
LOGF("IN\n");
_my_ipc_t *_ipc = (_my_ipc_t*)ipc;
CHECK_IF_FAIL2(ipc != NULL, -1);
if(_ipc->loop != NULL) {
g_main_loop_unref (_ipc->loop);
}
LOGF("OUT\n");
return 0;
}
my_ipc_result_t my_ipc_post_message(
my_ipc_t *ipc, int id, const char* body, int body_len)
{
LOGF("IN %p %d %s\n", ipc, id, body);
CHECK_IF_FAIL2(ipc != NULL, MSG_ERR_PARAM);
my_ipc_result_t res = MSG_SEND_SUCCESS;
_my_ipc_t *_ipc = (_my_ipc_t*)ipc;
GError *error = NULL;
if(_ipc->is_server_side) {
g_dbus_connection_emit_signal (
_ipc->connection,
NULL, /* bus_name */
"/com/zeerd/example/object",
"com.zeerd.example.ipc",
"signal",
g_variant_new ("(is)", id, body),
&error);
if(error != NULL) {
LOGE("Error parsing options: %s\n", error->message);
g_error_free (error);
res = MSG_SEND_FAIL;
LOGE("my_ipc_post_message from server failed!\n");
}
}
else {
GVariant *result = g_dbus_connection_call_sync (
_ipc->connection,
NULL, /* bus_name */
"/com/zeerd/example/object",
"com.zeerd.example.ipc",
"post",
g_variant_new ("(is)", id, body),
NULL,
G_DBUS_CALL_FLAGS_NONE,
-1,
NULL,
&error);
if(result == NULL) {
LOGE("Error parsing options: %s\n", error->message);
g_error_free (error);
res = MSG_SEND_FAIL;
LOGE("my_ipc_post_message from client failed!\n");
}
}
LOGF("OUT\n");
return res;
}
my_ipc_result_t my_ipc_send_message(
my_ipc_t *ipc,
int id, const char* body, int body_len, int timeout,
char** result, int *result_len)
{
LOGF("IN %p %d %s\n", ipc, id, body);
_my_ipc_t *_ipc = (_my_ipc_t*)ipc;
CHECK_IF_FAIL2(_ipc != NULL, MSG_ERR_PARAM);
CHECK_IF_FAIL2(!(_ipc->is_server_side), MSG_ERR_PARAM);
my_ipc_result_t res = MSG_SEND_SUCCESS;
GError *error = NULL;
GVariant *value = NULL;
const gchar *response;
value = g_dbus_connection_call_sync (_ipc->connection,
NULL, /* bus_name */
"/com/zeerd/example/object",
"com.zeerd.example.ipc",
"send",
g_variant_new ("(is)", id, body),
G_VARIANT_TYPE ("(s)"),
G_DBUS_CALL_FLAGS_NONE,
timeout,
NULL,
&error);
if (value != NULL) {
g_variant_get (value, "(&s)", &response);
*result = g_strdup(response);
*result_len = strlen(response);
g_variant_unref (value);
}
else {
LOGE ("Error invoking HelloWorld(): %s\n", error->message);
g_error_free (error);
res = MSG_SEND_FAIL;
}
LOGF("OUT\n");
return res;
}
my_ipc_result_t my_ipc_send_message_complete(
my_ipc_t *ipc, int id, const char* body, int body_len)
{
LOGF("IN\n");
CHECK_IF_FAIL2(ipc != NULL, -1);
_my_ipc_t *_ipc = (_my_ipc_t*)ipc;
g_dbus_method_invocation_return_value (
_ipc->invocation, g_variant_new ("(s)", body));
g_free(_ipc);
LOGF("OUT\n");
return MSG_SEND_SUCCESS;
}
#ifdef _TEST
gboolean quit = FALSE;
GSList* client_queue;
static void sigTerminal(int signo)
{
quit = TRUE;
}
static void _server_receive_msg_cb(
my_ipc_t* ipc,
int id, const char* body, int body_len, void *user)
{
printf("%s : %p : %d : %s : %p\n", __FUNCTION__, ipc, id, body, user);
my_ipc_post_message(ipc, 4, "signal", strlen("signal"));
if(id == 2) {
my_ipc_send_message_complete(ipc, id, "back-send", strlen("back-send"));
}
}
static void _server_client_cnnt_cb(my_ipc_t* ipc, void *user)
{
printf("%s : %p : %p\n", __FUNCTION__, ipc, user);
client_queue = g_slist_append(client_queue, (gpointer)ipc);
}
static void _server_client_discnnt_cb(my_ipc_t* ipc, void *user)
{
printf("%s : %p : %p\n", __FUNCTION__, ipc, user);
client_queue = g_slist_remove(client_queue, (gpointer)ipc);
}
static void _client_receive_msg_cb(
my_ipc_t* ipc,
int id, const char* body, int body_len, void *user)
{
printf("%s : %p : %d : %s : %p\n", __FUNCTION__, ipc, id, body, user);
}
int
main (int argc, char *argv[])
{
gint ret;
gboolean opt_server;
GError *error;
GOptionEntry opt_entries[] =
{
{ "server", 's', 0, G_OPTION_ARG_NONE, &opt_server,
"Start a server instead of a client", NULL },
{ NULL}
};
ret = 1;
signal(SIGINT, sigTerminal);
opt_server = FALSE;
GOptionContext *opt_context;
opt_context = g_option_context_new ("peer-to-peer example");
error = NULL;
g_option_context_add_main_entries (opt_context, opt_entries, NULL);
if (!g_option_context_parse (opt_context, &argc, &argv, &error)) {
LOGE("Error parsing options: %s\n", error->message);
g_error_free (error);
}
if (opt_server) {
client_queue = NULL;
my_ipc_t* ipc = my_ipc_create_server(
"com.zeerd.example.ipc.test.server",
_server_receive_msg_cb,
_server_client_cnnt_cb,
_server_client_discnnt_cb,
NULL,
NULL);
while(!quit) {
int i;
for(i=0;i<g_slist_length(client_queue);i++) {
my_ipc_t *clnt = (my_ipc_t*)g_slist_nth_data(client_queue, i);
my_ipc_post_message(clnt, 3, "signal", strlen("signal"));
}
sleep(1);
}
my_ipc_service_destory(ipc);
g_slist_free_full(client_queue, g_free);
}
else {
my_ipc_t* ipc = my_ipc_create_client(
"com.zeerd.example.ipc.test.server",
_client_receive_msg_cb,
NULL,
NULL);
my_ipc_post_message(ipc, 1, "post", strlen("post"));
char *result = NULL;
int result_len = -1;
my_ipc_send_message(
ipc, 2, "send", strlen("send"), 1000, &result, &result_len);
printf("send-result : %s\n", result);
free(result);
sleep(3);
my_ipc_client_destory(ipc);
}
ret = 0;
return ret;
}
#endif