zeerd's blog         Search     Categories     Tags     Feed

闲来生雅趣,无事乐逍遥。对窗相望雪,一盏茶香飘。

基于gdbus的p2p机能做成的类Windows的message机制

#GDBus #IPC #P2P @Program


Contents:

以下代码绝大部分来自于GLIB官方例程

/*
    For dbus-p2p-ipc-test :

        $CC dbus-p2p-ipc.c -g -O0 -D_TEST -o dbus-p2p-ipc-test \
            `pkg-config gio-2.0 --libs --cflags`

    For libdbus-p2p-ipc.so :

        $CC dbus-p2p-ipc.c -g -O0 -shared -fPIC -o libdbus-p2p-ipc.so \
            `pkg-config gio-2.0 --libs --cflags`
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

#include <sys/prctl.h>

#include <gio/gio.h>

#ifdef _TEST
    #define LOGD printf
#else
    #define LOGD
#endif

#define LOGF(fmt,...) printf("%s "fmt, __FUNCTION__, ##__VA_ARGS__)
#define LOGE printf

#define CHECK_IF_FAIL(c) \
    {\
        if(!(c)) {\
            LOGE("%s(%d): warning '%s' fail\n", __FILE__, __LINE__, #c);\
            return;\
        }\
    }

#define CHECK_IF_FAIL2(c, r) \
    {\
        if(!(c)) {\
            LOGE("%s(%d): warning '%s' fail\n", __FILE__, __LINE__, #c);\
            return r;\
        }\
    }

typedef enum {
    MSG_SEND_SUCCESS,   /**< a message sent success */
    MSG_SEND_FAIL,      /**< a message sent fail */
    MSG_ERR_PARAM,      /**< parameter wrong */
    MSG_ERR_CONNECT,    /**< error occur when connecting to the other side */
    MSG_ERROR           /**< something wrong in the system which we not sure */
}my_ipc_result_t;

typedef struct _my_ipc_t my_ipc_t;

typedef void (*my_ipc_receive_msg_cb_t)(
                    my_ipc_t* ipc,
                    int id, const char* body, int body_len, void *user);

typedef void (*my_ipc_client_cb_t)(my_ipc_t* ipc, void *user);

typedef struct _my_ipc_t_{

    char *label;

    GDBusConnection *connection;
    GDBusMethodInvocation *invocation;

    GDBusNodeInfo *introspection_data;

    gboolean is_server_side;
    pthread_t tid;
    GDBusServer *server;
    GMainLoop *loop;
    struct _my_ipc_t_ *parent;
    GHashTable* hashmap;

    my_ipc_receive_msg_cb_t rcv_cb;
    my_ipc_client_cb_t clnt_cnnt;
    my_ipc_client_cb_t clnt_discnnt;
    void *user;

}_my_ipc_t;

/* -------------------------------------------------------------------------- */

/* Introspection data for the service we are exporting */
static const gchar introspection_xml[] =
    "<node>"
    "    <interface name='com.zeerd.example.ipc'>"
    "        <method name='post'>"
    "            <arg type='i' name='id' direction='in'/>"
    "            <arg type='s' name='body' direction='in'/>"
    "            <arg type='s' name='result' direction='out'/>"
    "        </method>"
    "        <method name='send'>"
    "            <arg type='i' name='id' direction='in'/>"
    "            <arg type='s' name='body' direction='in'/>"
    "            <arg type='s' name='result' direction='out'/>"
    "        </method>"
    "        <signal name='signal'>"
    "            <arg type='i' name='id'/>"
    "            <arg type='s' name='body'/>"
    "        </signal>"
    "    </interface>"
    "</node>";

/* -------------------------------------------------------------------------- */

static void
handle_method_call (GDBusConnection *connection,
                    const gchar *sender,
                    const gchar *object_path,
                    const gchar *interface_name,
                    const gchar *method_name,
                    GVariant *parameters,
                    GDBusMethodInvocation *invocation,
                    gpointer user_data)
{
    LOGF("IN %p %s %s %s %s\n",
        connection, sender, object_path, interface_name, method_name);

    CHECK_IF_FAIL(user_data != NULL);

    _my_ipc_t *_ipc = (_my_ipc_t*)user_data;

    GError *error = NULL;
    pid_t pid = -1;
    _my_ipc_t *_sender = NULL;

    GCredentials *credentials
        = g_dbus_connection_get_peer_credentials(connection);
    if(credentials != NULL) {
        pid = g_credentials_get_unix_pid (credentials, &error);
        if(error != NULL) {
            LOGE("Error getting sender's pid : %s\n", error->message);
            g_error_free (error);
        }
        else {
            _sender = (_my_ipc_t*)g_hash_table_lookup(
                                    _ipc->hashmap, (gpointer)(pid));
        }
    }

    LOGD("sender's pid = %d\n", pid);

    if (g_strcmp0 (method_name, "post") == 0) {
        gint id;
        const gchar *body;

        g_variant_get (parameters, "(i&s)", &id, &body);

        g_dbus_method_invocation_return_value (
                invocation, g_variant_new ("(s)", ""));

        _ipc->rcv_cb((my_ipc_t*)_sender, id, body, strlen(body), _ipc->user);

        LOGD ("post : id = '%d', body = '%s'.\n", id, body);
    }
    else if (g_strcmp0 (method_name, "send") == 0) {
        gint id;
        const gchar *body;

        g_variant_get (parameters, "(i&s)", &id, &body);

        _my_ipc_t *_new = (_my_ipc_t*)g_malloc0(sizeof(_my_ipc_t));
        if(_new != NULL && _sender != NULL) {
            LOGD("new send client ipc created : %p\n", _new);
            _new->is_server_side = _sender->is_server_side;
            _new->connection = _sender->connection;
            _new->invocation = invocation;
            _ipc->rcv_cb((my_ipc_t*)_new, id, body, strlen(body), _ipc->user);
        }

        LOGD ("send : id = '%d', body = '%s'.\n", id, body);
    }
}

static const GDBusInterfaceVTable interface_vtable =
{
    handle_method_call,
    NULL,
    NULL,
};

/* -------------------------------------------------------------------------- */

static gboolean
on_closed (GDBusConnection * connection,
            gboolean remote_peer_vanished,
            GError * error,
            gpointer user_data)
{

    CHECK_IF_FAIL2(user_data != NULL, FALSE);

    _my_ipc_t *_ipc = (_my_ipc_t*)user_data;

    _ipc->parent->clnt_discnnt((my_ipc_t*)_ipc, _ipc->parent->user);

    return TRUE;
}

static gboolean
on_new_connection (GDBusServer *server,
                                     GDBusConnection *connection,
                                     gpointer user_data)
{
    guint registration_id;
    GCredentials *credentials;
    gchar *s;
    pid_t pid = -1;

    CHECK_IF_FAIL2(user_data != NULL, FALSE);

    _my_ipc_t *_svr = (_my_ipc_t*)user_data;

    credentials = g_dbus_connection_get_peer_credentials (connection);
    if (credentials == NULL)
        s = g_strdup ("(no credentials received)");
    else {
        s = g_credentials_to_string (credentials);
        GError *error = NULL;
        pid = g_credentials_get_unix_pid (credentials, &error);
            if(error != NULL) {
                LOGE("Error getting connector's pid : %s\n", error->message);
                g_error_free (error);
            }
        }


    LOGD ("Client connected.\n"
                     "Peer credentials: %s\n"
                     "Negotiated capabilities: unix-fd-passing=%d\n"
                     "Unique name: %s\n",
                     s,
                     g_dbus_connection_get_capabilities (
                        connection) & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING,
                     g_dbus_connection_get_unique_name(connection));

    _my_ipc_t *_ipc = (_my_ipc_t*)g_malloc0(sizeof(_my_ipc_t));
    if(_ipc != NULL) {
        LOGD("new client ipc created : %p\n", _ipc);
        _ipc->is_server_side = TRUE;
        _ipc->connection = connection;
        _ipc->parent = _svr;

        g_signal_connect (_ipc->connection,
                          "closed",
                          G_CALLBACK (on_closed),
                          _ipc);

        if(pid > 0) {
            g_hash_table_insert(_svr->hashmap, (gpointer)(pid), _ipc);
        }

        _svr->clnt_cnnt((my_ipc_t*)_ipc, _svr->user);
    }

    g_object_ref (connection);
    registration_id = g_dbus_connection_register_object (connection,
                            "/com/zeerd/example/object",
                            _svr->introspection_data->interfaces[0],
                            &interface_vtable,
                            _svr,    /* user_data */
                            NULL,    /* user_data_free_func */
                            NULL); /* GError** */
    g_assert (registration_id > 0);

    return TRUE;
}

/* -------------------------------------------------------------------------- */

static void _init(_my_ipc_t *_ipc)
{
    /* We are lazy here - we don't want to manually provide
     * the introspection data structures - so we just build
     * them from XML.
     */
    _ipc->introspection_data
        = g_dbus_node_info_new_for_xml (introspection_xml, NULL);
}

static void signalCallback (GDBusConnection *connection,
                        const gchar *sender_name,
                        const gchar *object_path,
                        const gchar *interface_name,
                        const gchar *signal_name,
                        GVariant *parameters,
                        gpointer user_data)
{
    LOGF("IN\n");
    CHECK_IF_FAIL(user_data != NULL);

    _my_ipc_t *_ipc = (_my_ipc_t*)user_data;

    gint id;
    const gchar *body;

    g_variant_get (parameters, "(i&s)", &id, &body);

    _ipc->rcv_cb((my_ipc_t*)_ipc, id, body, strlen(body), _ipc->user);

    LOGD ("signal : id = '%d', body = '%s'.\n", id, body);
    LOGF("\n");
}

static void *ipc_client_loop(void *arg)
{
    LOGF("IN\n");
    _my_ipc_t *_ipc = (_my_ipc_t*)arg;

    _ipc->loop = g_main_loop_new (NULL, FALSE);
    g_main_loop_run (_ipc->loop);

    g_object_unref (_ipc->connection);

    g_dbus_node_info_unref (_ipc->introspection_data);

    g_free(_ipc->label);
    g_free(_ipc);

    LOGF("OUT\n");
    return (gpointer)0;
}

my_ipc_t* my_ipc_create_client(
        const char* target_label,
        my_ipc_receive_msg_cb_t cb, const gpointer user,
        my_ipc_result_t *error)
{
    LOGF("IN\n");

    if(error != NULL) {*error = MSG_SEND_SUCCESS;}

    _my_ipc_t *_ipc = (_my_ipc_t*)g_malloc0(sizeof(_my_ipc_t));
    if(_ipc != NULL) {

        LOGD("client ipc created : %p\n", _ipc);

        _init(_ipc);

        GError *err = NULL;

        _ipc->is_server_side = FALSE;

        _ipc->label = g_strdup_printf("unix:abstract=%s", target_label);
        _ipc->rcv_cb = cb;
        _ipc->user = (gpointer)user;

        _ipc->connection = g_dbus_connection_new_for_address_sync (
                _ipc->label,
                G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT,
                NULL, /* GDBusAuthObserver */
                NULL, /* GCancellable */
                &err);

        if (_ipc->connection != NULL) {
            LOGD("Connected.\n"
                       "Negotiated capabilities: unix-fd-passing=%d\n",
                       g_dbus_connection_get_capabilities (
                            _ipc->connection)
                                & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING);

            guint subid = g_dbus_connection_signal_subscribe (
                _ipc->connection,
                NULL,
                "com.zeerd.example.ipc",
                NULL,
                "/com/zeerd/example/object",
                NULL,
                G_DBUS_SIGNAL_FLAGS_NO_MATCH_RULE,
                signalCallback,
                (gpointer)_ipc,
                NULL);
            LOGD("g_dbus_connection_signal_subscribe : %d\n", subid);

            pthread_create(&(_ipc->tid), NULL, ipc_client_loop, (gpointer)_ipc);
        }
        else {
            LOGE("Error connecting to D-Bus address %s: %s\n",
                _ipc->label, err->message);
            g_error_free (err);

            if(error != NULL) {*error = MSG_ERR_CONNECT;}
        }
    }
    else {
        if(error != NULL) {*error = MSG_ERROR;}
    }

    LOGF("OUT\n");
    return (my_ipc_t*)_ipc;
}

int my_ipc_client_destory(my_ipc_t *ipc)
{
    LOGF("IN\n");
    _my_ipc_t *_ipc = (_my_ipc_t*)ipc;

    CHECK_IF_FAIL2(ipc != NULL, -1);

    if(_ipc->loop != NULL) {
        g_main_loop_unref (_ipc->loop);
    }
    LOGF("OUT\n");
    return 0;
}

static void *ipc_server_loop(void *arg)
{
    LOGF("IN\n");
    _my_ipc_t *_ipc = (_my_ipc_t*)arg;

    _init(_ipc);

    gchar *guid;
    GDBusServerFlags server_flags;

    guid = g_dbus_generate_guid ();

    server_flags = G_DBUS_SERVER_FLAGS_NONE;

    GError *error = NULL;

    _ipc->server = g_dbus_server_new_sync (
                 _ipc->label,
                 server_flags,
                 guid,
                 NULL, /* GDBusAuthObserver */
                 NULL, /* GCancellable */
                 &error);
    g_dbus_server_start (_ipc->server);
    g_free (guid);

    if (_ipc->server != NULL) {
        LOGD("Server is listening at: %s\n",
            g_dbus_server_get_client_address (_ipc->server));
        g_signal_connect (_ipc->server,
                          "new-connection",
                          G_CALLBACK (on_new_connection),
                          _ipc);

        _ipc->loop = g_main_loop_new (NULL, FALSE);
        g_main_loop_run (_ipc->loop);

        g_object_unref (_ipc->server);

        g_main_loop_unref (_ipc->loop);
    }
    else {
        LOGE("Error creating server at address %s: %s\n",
            _ipc->label, error->message);
        g_error_free (error);
    }

    g_hash_table_remove_all(_ipc->hashmap);

    g_dbus_node_info_unref (_ipc->introspection_data);
    g_free(_ipc->label);
    g_free(_ipc);

    LOGF("OUT\n");
    return (gpointer)0;
}

my_ipc_t* my_ipc_create_server(
    const char* self_label,
    my_ipc_receive_msg_cb_t rcv,
    my_ipc_client_cb_t clnt_cnnt,
    my_ipc_client_cb_t clnt_discnnt,
    const gpointer user,
    my_ipc_result_t *error)
{

    LOGF("IN\n");

    if(error != NULL) {*error = MSG_SEND_SUCCESS;}

    _my_ipc_t *_ipc = (_my_ipc_t*)g_malloc0(sizeof(_my_ipc_t));

    if(_ipc != NULL) {
        LOGD("server ipc created : %p\n", _ipc);
        _ipc->is_server_side = TRUE;
        _ipc->rcv_cb = rcv;
        _ipc->clnt_cnnt = clnt_cnnt;
        _ipc->clnt_discnnt = clnt_discnnt;
        _ipc->user = (gpointer)user;
        _ipc->label = g_strdup_printf("unix:abstract=%s", self_label);

        _ipc->hashmap = g_hash_table_new(g_direct_hash, g_direct_equal);

        pthread_create(&(_ipc->tid), NULL, ipc_server_loop, (gpointer)_ipc);

        while(_ipc->loop != NULL && !g_main_loop_is_running(_ipc->loop)) {
            usleep(10);
        }
    }
    else {
        if(error != NULL) {*error = MSG_ERROR;}
    }

    LOGF("OUT\n");

    return (my_ipc_t*)_ipc;
}

int my_ipc_service_destory(my_ipc_t *ipc)
{
    LOGF("IN\n");
    _my_ipc_t *_ipc = (_my_ipc_t*)ipc;

    CHECK_IF_FAIL2(ipc != NULL, -1);

    if(_ipc->loop != NULL) {
        g_main_loop_unref (_ipc->loop);
    }

    LOGF("OUT\n");
    return 0;
}

my_ipc_result_t my_ipc_post_message(
        my_ipc_t *ipc, int id, const char* body, int body_len)
{
    LOGF("IN %p %d %s\n", ipc, id, body);

    CHECK_IF_FAIL2(ipc != NULL, MSG_ERR_PARAM);

    my_ipc_result_t res = MSG_SEND_SUCCESS;

    _my_ipc_t *_ipc = (_my_ipc_t*)ipc;

    GError *error = NULL;

    if(_ipc->is_server_side) {

        g_dbus_connection_emit_signal (
            _ipc->connection,
            NULL, /* bus_name */
            "/com/zeerd/example/object",
            "com.zeerd.example.ipc",
            "signal",
            g_variant_new ("(is)", id, body),
            &error);

        if(error != NULL) {
            LOGE("Error parsing options: %s\n", error->message);
            g_error_free (error);
            res = MSG_SEND_FAIL;
            LOGE("my_ipc_post_message from server failed!\n");
        }
    }
    else {

        GVariant *result = g_dbus_connection_call_sync (
            _ipc->connection,
            NULL, /* bus_name */
            "/com/zeerd/example/object",
            "com.zeerd.example.ipc",
            "post",
            g_variant_new ("(is)", id, body),
            NULL,
            G_DBUS_CALL_FLAGS_NONE,
            -1,
            NULL,
            &error);

        if(result == NULL) {
            LOGE("Error parsing options: %s\n", error->message);
            g_error_free (error);
            res = MSG_SEND_FAIL;
            LOGE("my_ipc_post_message from client failed!\n");
        }

    }

    LOGF("OUT\n");
    return res;
}

my_ipc_result_t my_ipc_send_message(
    my_ipc_t *ipc,
    int id, const char* body, int body_len, int timeout,
    char** result, int *result_len)
{
    LOGF("IN %p %d %s\n", ipc, id, body);

    _my_ipc_t *_ipc = (_my_ipc_t*)ipc;

    CHECK_IF_FAIL2(_ipc != NULL, MSG_ERR_PARAM);
    CHECK_IF_FAIL2(!(_ipc->is_server_side), MSG_ERR_PARAM);

    my_ipc_result_t res = MSG_SEND_SUCCESS;

    GError *error = NULL;
    GVariant *value = NULL;
    const gchar *response;

    value = g_dbus_connection_call_sync (_ipc->connection,
                NULL, /* bus_name */
                "/com/zeerd/example/object",
                "com.zeerd.example.ipc",
                "send",
                g_variant_new ("(is)", id, body),
                G_VARIANT_TYPE ("(s)"),
                G_DBUS_CALL_FLAGS_NONE,
                timeout,
                NULL,
                &error);

    if (value != NULL) {
        g_variant_get (value, "(&s)", &response);
        *result = g_strdup(response);
        *result_len = strlen(response);
        g_variant_unref (value);
    }
    else {
        LOGE ("Error invoking HelloWorld(): %s\n", error->message);
        g_error_free (error);
        res = MSG_SEND_FAIL;
    }

    LOGF("OUT\n");
    return res;
}

my_ipc_result_t my_ipc_send_message_complete(
    my_ipc_t *ipc, int id, const char* body, int body_len)
{
    LOGF("IN\n");
    CHECK_IF_FAIL2(ipc != NULL, -1);

    _my_ipc_t *_ipc = (_my_ipc_t*)ipc;

    g_dbus_method_invocation_return_value (
        _ipc->invocation, g_variant_new ("(s)", body));

    g_free(_ipc);

    LOGF("OUT\n");
    return  MSG_SEND_SUCCESS;
}

#ifdef _TEST

gboolean quit = FALSE;
GSList* client_queue;

static void sigTerminal(int signo)
{
    quit = TRUE;
}

static void _server_receive_msg_cb(
                    my_ipc_t* ipc,
                    int id, const char* body, int body_len, void *user)
{
    printf("%s : %p : %d : %s : %p\n", __FUNCTION__, ipc, id, body, user);

    my_ipc_post_message(ipc, 4, "signal", strlen("signal"));

    if(id == 2) {
        my_ipc_send_message_complete(ipc, id, "back-send", strlen("back-send"));
    }

}

static void _server_client_cnnt_cb(my_ipc_t* ipc, void *user)
{
    printf("%s : %p : %p\n", __FUNCTION__, ipc, user);
    client_queue = g_slist_append(client_queue, (gpointer)ipc);
}

static void _server_client_discnnt_cb(my_ipc_t* ipc, void *user)
{
    printf("%s : %p : %p\n", __FUNCTION__, ipc, user);
    client_queue = g_slist_remove(client_queue, (gpointer)ipc);
}

static void _client_receive_msg_cb(
                    my_ipc_t* ipc,
                    int id, const char* body, int body_len, void *user)
{
    printf("%s : %p : %d : %s : %p\n", __FUNCTION__, ipc, id, body, user);
}

int
main (int argc, char *argv[])
{
    gint ret;
    gboolean opt_server;
    GError *error;
    GOptionEntry opt_entries[] =
        {
            { "server", 's', 0, G_OPTION_ARG_NONE, &opt_server,
                "Start a server instead of a client", NULL },
            { NULL}
        };

    ret = 1;

    signal(SIGINT, sigTerminal);

    opt_server = FALSE;

    GOptionContext *opt_context;

    opt_context = g_option_context_new ("peer-to-peer example");
    error = NULL;
    g_option_context_add_main_entries (opt_context, opt_entries, NULL);
    if (!g_option_context_parse (opt_context, &argc, &argv, &error)) {
        LOGE("Error parsing options: %s\n", error->message);
        g_error_free (error);
    }


    if (opt_server) {
        client_queue = NULL;
        my_ipc_t* ipc = my_ipc_create_server(
            "com.zeerd.example.ipc.test.server",
            _server_receive_msg_cb,
            _server_client_cnnt_cb,
            _server_client_discnnt_cb,
            NULL,
            NULL);

        while(!quit) {
            int i;
            for(i=0;i<g_slist_length(client_queue);i++) {
                my_ipc_t *clnt = (my_ipc_t*)g_slist_nth_data(client_queue, i);
                my_ipc_post_message(clnt, 3, "signal", strlen("signal"));
            }
            sleep(1);
        }

        my_ipc_service_destory(ipc);
        g_slist_free_full(client_queue, g_free);
    }
    else {
        my_ipc_t* ipc = my_ipc_create_client(
            "com.zeerd.example.ipc.test.server",
            _client_receive_msg_cb,
            NULL,
            NULL);

        my_ipc_post_message(ipc, 1, "post", strlen("post"));

        char *result = NULL;
        int result_len = -1;
        my_ipc_send_message(
            ipc, 2, "send", strlen("send"), 1000, &result, &result_len);

        printf("send-result : %s\n", result);
        free(result);

        sleep(3);

        my_ipc_client_destory(ipc);
    }

    ret = 0;

    return ret;
}

#endif