一、背景
在实际工程中,难免会遇到不通系统之间通信,如何进行系统之间通信呢?(作为一个“全栈工程师”,必须要解决它!)。
系统之间通信方式很多如:系统之间调用(http/rpc等),异步间接调用如发送消息、公共存储等。目前,本人从事的项目中遇到web业务工程(java)依赖与算法工程(c ) 处理的视频/图片分类与标记结果。两个系统之前数据通信采用了kafka消息方式。
算法工程为c/c 工程,本文将介绍如何在c/c 中如何发送与接收kakfa消息(包含:kafka的sasl认证方式),并提供了详细的源码和讲解。(至于java中如何发送与接收kakfa消息如有需要,可留言或私聊!)
二、环境依赖安装
# 下载librdkafka git clone https://github.com/edenhill/librdkafka.git # 编译 cd librdkafka ./configure --prefix=/usr/local # 安装 sudo make install # 验证:查看/usr/local/lib目录下是否有librdkafka文件 ls /usr/local/lib | grep kafka
三、编写kakfa生产者消费者
3.1 生产者
#include// 包含c api头文件 #include #include #include int main() { const char *brokers = "xx.xx.xx.xx:7091"; // kafka broker地址 const char *topic_name = "kafka_msg_topic_test"; const char *payload = "hello, kafka from librdkafka!"; size_t len = strlen(payload); // 创建配置对象 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 设置broker地址 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, null, 0) != rd_kafka_conf_ok) { std::cerr << "failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 创建生产者实例 rd_kafka_t *rk = rd_kafka_new(rd_kafka_producer, conf, null, 0); if (!rk) { std::cerr << "failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 创建topic句柄(可选,但推荐) rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, null); if (!rkt) { std::cerr << "failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); // rd_kafka_conf_destroy(conf); return 1; } // 发送消息 int32_t partition = rd_kafka_partition_ua; // 自动选择分区 int err = rd_kafka_produce(rkt, partition, rd_kafka_msg_f_copy, const_cast (payload), len, null, 0, null); if (err != rd_kafka_resp_err_no_error) { std::cerr << "failed to produce to topic " << topic_name << ": " << err << std::endl; } else { std::cout << "produced " << len << " bytes to topic " << topic_name << std::endl; } // 等待所有消息发送完成(可选,但推荐) // 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认 int msgs_sent = 0; while (rd_kafka_outq_len(rk) > 0) { rd_kafka_poll(rk, 100); // 轮询kafka队列,直到所有消息都发送出去 msgs_sent = rd_kafka_outq_len(rk); } // 销毁topic句柄 rd_kafka_topic_destroy(rkt); // 销毁生产者实例 rd_kafka_destroy(rk); // 销毁配置对象 // rd_kafka_conf_destroy(conf); return 0; }
3.2 消费者
#include#include #include #include #include void error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) { // 错误处理回调 std::cerr << "kafka error: " << err << ": " << reason << std::endl; } int main() { std::cerr << "start " << std::endl; const char *brokers = "xx.xx.xx.xx:7091"; // kafka broker地址 const char *group_id = "kafka_msg_topic_test"; // 消费者组id const char *topic_name = "kafka_msg_topic_test"; // kafka topic名称 // 创建配置对象 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 设置broker地址 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, null, 0) != rd_kafka_conf_ok) { std::cerr << "failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 设置消费者组id if (rd_kafka_conf_set(conf, "group.id", group_id, null, 0) != rd_kafka_conf_ok) { std::cerr << "failed to set group.id: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 设置错误处理回调(可选) rd_kafka_conf_set_error_cb(conf, error_cb); // 创建消费者实例 rd_kafka_t *rk = rd_kafka_new(rd_kafka_consumer, conf, null, 0); if (!rk) { std::cerr << "failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 创建一个topic分区列表 rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1); if (!topics) { std::cerr << "failed to create topic partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); return 1; } // 添加topic到分区列表 if (!rd_kafka_topic_partition_list_add(topics, topic_name, rd_kafka_partition_ua)) { std::cerr << "failed to add topic to partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_topic_partition_list_destroy(topics); rd_kafka_destroy(rk); return 1; } // 订阅topic rd_kafka_resp_err_t err = rd_kafka_subscribe(rk, topics); if (err != rd_kafka_resp_err_no_error) { std::cerr << "failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl; rd_kafka_topic_partition_list_destroy(topics); rd_kafka_destroy(rk); return 1; } // 销毁分区列表(订阅后不再需要) rd_kafka_topic_partition_list_destroy(topics); // 轮询消息 while (true) { rd_kafka_message_t *rkmessage; rkmessage = rd_kafka_consumer_poll(rk, 1000); // 等待1秒以获取消息 if (rkmessage == null) { // 没有消息或者超时 continue; } if (rkmessage->err) { // 处理错误 if (rkmessage->err == rd_kafka_resp_err__partition_eof) { // 消息流的末尾 std::cout << "end of partition event" << std::endl; } else { // 打印错误并退出 std::cerr << "kafka consumer error: " << rd_kafka_message_errstr(rkmessage) << std::endl; break; } } else { // 处理消息 std::cout << "received message at offset " << rkmessage->offset << " from partition " << rkmessage->partition << " with key \"" << rkmessage->key << "\" and payload size "<< rkmessage->len << " value :" <<(char *)rkmessage->payload << std::endl; // 如果需要,可以在这里处理消息内容 // 例如,使用rkmessage->payload()获取消息内容 // 释放消息 rd_kafka_message_destroy(rkmessage); } } // 清理 rd_kafka_destroy(rk); return 0; }
3.3 编译运行
3.3.1 编译生产者消费者
g -o send_kafka sendkakfamessage.cpp -i/usr/local/include/librdkafka -lrdkafka -lrdkafka -lpthread
g -o receive_kafka receivekafkamessage.cpp -i/usr/local/include/librdkafka -lrdkafka -lrdkafka -lpthread
3.3.2 运行验证
执行时,若出现错误: error while loading shared libraries: librdkafka .so.1: cannot open shared object file: no such file or directory
则需要执行下面环境变量配置:
export ld_library_path=/usr/local/lib:$ld_library_path
生产者:发送消息
消费者:接收消息
3.4 sasl认证kakfa
下面是,支持sasl认证的kakka生产者完整代码
#include#include #include #include int main(int argc, char *argv[]) { const char *brokers = "xx.xx.xx.xx:8092"; // kafka broker地址 const char *username = "xxx"; const char *password = "xxx"; const char *topic_name = "kafka_msg_test_sasl"; const char *payload = "hello, kafka from librdkafka! sasl"; size_t len = strlen(payload); // 初始化配置 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } char errstr[512]; // 声明一个足够大的字符数组来存储错误信息 // 设置sasl相关的配置 if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != rd_kafka_conf_ok) { std::cerr << "failed to set security.protocol: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.mechanisms", "plain", errstr, sizeof(errstr)) != rd_kafka_conf_ok) { std::cerr << "failed to set sasl.mechanisms: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != rd_kafka_conf_ok) { std::cerr << "failed to set sasl.username: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != rd_kafka_conf_ok) { std::cerr << "failed to set sasl.password: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != rd_kafka_conf_ok) { std::cerr << "failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 检查配置是否设置成功 if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != rd_kafka_conf_ok) { std::cerr << "failed to set configuration: " << errstr << std::endl; return 1; } // 创建producer实例 rd_kafka_t *rk = rd_kafka_new(rd_kafka_producer, conf, errstr, sizeof(errstr)); if (!rk) { std::cerr << "failed to create new producer: " << errstr << std::endl; return 1; } // 创建topic句柄(可选,但推荐) rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, null); if (!rkt) { std::cerr << "failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); // rd_kafka_conf_destroy(conf); return 1; } // 发送消息 int32_t partition = rd_kafka_partition_ua; // 自动选择分区 int err = rd_kafka_produce(rkt, partition, rd_kafka_msg_f_copy, const_cast (payload), len, null, 0, null); if (err != rd_kafka_resp_err_no_error) { std::cerr << "failed to produce to topic " << topic_name << ": " << err << std::endl; } else { std::cout << "produced " << len << " bytes to topic " << topic_name << std::endl; } // 等待所有消息发送完成(可选,但推荐) // 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认 int msgs_sent = 0; while (rd_kafka_outq_len(rk) > 0) { rd_kafka_poll(rk, 100); // 轮询kafka队列,直到所有消息都发送出去 msgs_sent = rd_kafka_outq_len(rk); } // 销毁topic句柄 rd_kafka_topic_destroy(rkt); // 清理资源 rd_kafka_destroy(rk); return 0; }
在kafka map 管理界面中查看发送效果如下: