今天给各位分享c语言kafka的知识,其中也会对C语言代码进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
本文目录一览:
1、kafka解析程序用c语言好还是java号2、如何在kafka-python和confluent-kafka之间做出选择3、windows下c++中使用kafka
kafka解析程序用c语言好还是java号
我这里是使用的是,kafka自带的zookeeper。
以及关于kafka的日志文件啊,都放在默认里即/tmp下,我没修改。保存默认的
1、[hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ jps
2625 Jps
2、[hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties
此刻,这时,会一直停在这,因为是前端运行。
另开一窗口,
3、[hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties
也是前端运行。
如何在kafka-python和confluent-kafka之间做出选择
kafka-python:蛮荒的西部
kafka-python是最受欢迎的Kafka Python客户端。我们过去使用时从未出现过任何问题,在我的《敏捷数据科学2.0》一书中我也用过它。然而在最近这个项目中,它却出现了一个严重的问题。我们发现,当以文档化的方式使用KafkaConsumer、Consumer迭代式地从消息队列中获取消息时,最终到达主题topic的由Consumer携带的消息通常会丢失。我们通过控制台Consumer的分析验证了这一点。
需要更详细说明的是,kafka-python和KafkaConsumer是与一个由SSL保护的Kafka服务(如Aiven Kafka)一同使用的,如下面这样:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol=’SSL’,
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)
for message in kafka_consumer:
application_message = json.loads(message.value.decode())
…
当以这样的推荐方式使用时,KafkaConsumer会丢失消息。但有一个变通方案,就是保留所有消息。这个方案是Kafka服务提供商Aiven support提供给我们的。它看起来像这样:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
…
虽然这个变通方案可能有用,但README中的方法会丢弃消息使我对其失去兴趣。所以我找到了一个替代方案。
confluent-kafka:企业支持
发现coufluent-kafka Python模块时,我感到无比惊喜。它既能做librdkafka的外封装,又非常小巧。librdkafka是一个用C语言写的kafka库,它是Go和.NET的基础。更重要的是,它由Confluent公司支持。我爱开源,但是当“由非正式社区拥有或支持”这种方式效果不行的时候,或许该考虑给替代方案印上公章、即该由某个公司拥有或支持了。不过,我们并未购买商业支持。我们知道有人会维护这个库的软件质量,而且可以选择买或不买商业支持,这一点真是太棒了。
用confluent-kafka替换kafka-python非常简单。confluent-kafka使用poll方法,它类似于上面提到的访问kafka-python的变通方案。
kafka_consumer = Consumer(
{
“api.version.request”: True,
“enable.auto.commit”: True,
“group.id”: group_id,
“bootstrap.servers”: config.kafka.host,
“security.protocol”: “ssl”,
“ssl.ca.location”: config.kafka.ca_pem,
“ssl.certificate.location”: config.kafka.service_cert,
“ssl.key.location”: config.kafka.service_key,
“default.topic.config”: {“auto.offset.reset”: “smallest”}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())
kafka_consumer.close()
现在我们能收到所有消息了。我并不是说kafka-python工具不好,我相信社区会对它的问题做出反应并解决。但从现在开始,我会一直坚持使用confluent-kafka。
开源治理
开源是强大的,但是涉及到复杂的“大数据”和NoSQL工具时,通常需要有一家大公司在背后推动工具的开发。这样你就知道,如果那个公司可以使用工具,那么该工具应该拥有很好的基本功能。它的出现可能是非正式的,就像某公司发布类似FOSS的项目一样,但也可能是正式的,就像某公司为工具提供商业支持一样。当然,从另一个角度来看,如果一家与开源社区作对的公司负责开发某个工具,你便失去了控制权。你的意见可能无关紧要,除非你是付费客户。
理想情况是采取开源治理,就像Apache基金会一样,还有就是增加可用的商业支持选项。这对互联网上大部分的免费软件来说根本不可能。限制自己只使用那些公司盖章批准后的工具将非常限制你的自由。这对于一些商店可能是正确选择,但对于我们不是。我喜欢工具测试,如果工具很小,而且只专心做一件事,我就会使用它。
信任开源
对于更大型的工具,以上决策评估过程更为复杂。通常,我会看一下提交问题和贡献者的数量,以及最后一次commit的日期。我可能会问朋友某个工具的情况,有时也会在推特上问。当你进行嗅探检查后从Github选择了一个项目,即说明你信任社区可以产出好的工具。对于大多数工具来说,这是没问题的。
但信任社区可能存在问题。对于某个特定的工具,可能并没有充分的理由让你信任社区可以产出好的软件。社区在目标、经验和开源项目的投入时间方面各不相同。选择工具时保持审慎态度十分重要,不要让理想蒙蔽了判断。
windows下c++中使用kafka
kafka的c/c++的client有
用户最多的是librdkafka, github上有2000+star, 笔者使用的也是librdkafka
还没有正式的0.11.6 release版本,故而笔者选用的是v0.11.5版本,然后掉坑里了
v0.11.5版本rd_lock() 有个bug,在window平台下,机器开机超过7天,rd_lock()的返回值就溢出了,导致无法produce和consume;笔者的机器是常年不关的,所以写测试程序的时候,一开始就发现无法produce和consume,一度怀疑是不是配置没有 配好,还是跟kafka server 版本不兼容,然后用go的kafka client sarama 几分钟完成测试,弄的我快崩溃了,一度想放弃;最后没办法,就单步调试,发现是因为获取系统时间溢出,导致produce request 和 fetch request一直无法满足条件去执行;v0.11.6.-rc2 已经修复这个问题或者版本回退到v0.11.4也没有这个问题。。。
下面几个issues都是这个问题导致的
鉴于官方还没有正式发布v0.11.6,笔者选用了最新的 v0.11.6-rc4
关于c语言kafka和C语言代码的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。