From 33c09179d88e7c3a358c747499404ce1b3e6d4d6 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Sat, 12 Oct 2019 17:28:59 +0800 Subject: [PATCH 01/23] feat(producer) add transaction message support --- src/PythonWrapper.cpp | 66 +++++++++++++++++++++++++++++++++++-------- src/PythonWrapper.h | 5 ++++ 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index 404c92a..cf310b9 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -33,6 +33,7 @@ const char *VERSION = "PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " "; map> g_CallBackMap; +map g_TransactionCallBackMap; class PyThreadStateLock { public: @@ -134,6 +135,21 @@ void *PyCreateProducer(const char *groupId) { PyEval_InitThreads(); // ensure create GIL, for call Python callback from C. return (void *) CreateProducer(groupId); } + +void *PyCreateTransactionProducer(const char *groupId, PyObject *localTransactionCheckerCallback) { + PyEval_InitThreads(); + CProducer *producer = CreateTransactionProducer(groupId, &PyLocalTransactionCheckerCallback, localTransactionCheckerCallback); + g_TransactionCallBackMap[producer] = localTransactionCheckerCallback; + return producer; +} + +CTransactionStatus PyLocalTransactionCheckerCallback(CProducer *producer, CMessageExt *msg, void *data) { + PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback + PyObject * checkerCallback = (PyObject *) data; + CTransactionStatus status = boost::python::call(checkerCallback, (void *) msg); + return status; +} + int PyDestroyProducer(void *producer) { return DestroyProducer((CProducer *) producer); } @@ -190,23 +206,23 @@ int PySendMessageOneway(void *producer, void *msg) { return SendMessageOneway((CProducer *) producer, (CMessage *) msg); } -void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback){ +void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback) { PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback PySendResult sendResult; sendResult.sendStatus = result.sendStatus; sendResult.offset = result.offset; strncpy(sendResult.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); sendResult.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; - PyCallback *callback = (PyCallback *)pyCallback; + PyCallback *callback = (PyCallback *) pyCallback; boost::python::call(callback->successCallback, sendResult, (void *) msg); delete pyCallback; } -void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){ +void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback) { PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback PyMQException exception; - PyCallback *callback = (PyCallback *)pyCallback; + PyCallback *callback = (PyCallback *) pyCallback; exception.error = e.error; exception.line = e.line; strncpy(exception.file, e.file, MAX_EXEPTION_FILE_LENGTH - 1); @@ -219,11 +235,12 @@ void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){ delete pyCallback; } -int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){ - PyCallback* pyCallback = new PyCallback(); +int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback) { + PyCallback *pyCallback = new PyCallback(); pyCallback->successCallback = sendSuccessCallback; pyCallback->exceptionCallback = sendExceptionCallback; - return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, (void *)pyCallback); + return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, + (void *) pyCallback); } PySendResult PySendBatchMessage(void *producer, void *batchMessage) { @@ -241,8 +258,9 @@ PySendResult PySendBatchMessage(void *producer, void *batchMessage) { PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) { PySendResult ret; CSendResult result; - PyUserData userData = {queueSelector,args}; - SendMessageOrderly((CProducer *) producer, (CMessage *) msg, &PyOrderlyCallbackInner, &userData, autoRetryTimes, &result); + PyUserData userData = {queueSelector, args}; + SendMessageOrderly((CProducer *) producer, (CMessage *) msg, &PyOrderlyCallbackInner, &userData, autoRetryTimes, + &result); ret.sendStatus = result.sendStatus; ret.offset = result.offset; strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); @@ -251,7 +269,7 @@ PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, } int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) { - PyUserData *userData = (PyUserData *)args; + PyUserData *userData = (PyUserData *) args; int index = boost::python::call(userData->pyObject, size, (void *) msg, userData->pData); return index; } @@ -267,6 +285,26 @@ PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const return ret; } +CTransactionStatus PyLocalTransactionExecuteCallback(CProducer *producer, CMessage *msg, void *data) { + PyUserData *localCallback = (PyUserData *) data; + CTransactionStatus status = boost::python::call(localCallback->pyObject, (void *) msg, + localCallback->pData); + return status; +} + +PySendResult PySendMessageInTransaction(void *producer, void *msg, PyObject *localTransactionCallback, void *args) { + PyUserData userData = {localTransactionCallback, args}; + PySendResult ret; + CSendResult result; + SendMessageTransaction((CProducer *) producer, (CMessage *) msg, &PyLocalTransactionExecuteCallback, &userData, + &result); + ret.sendStatus = result.sendStatus; + ret.offset = result.offset; + strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); + ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + return ret; +} + //SendResult const char *PyGetSendResultMsgID(CSendResult &sendResult) { return (const char *) (sendResult.msgId); @@ -308,7 +346,7 @@ int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args) return RegisterMessageCallback(consumerInner, &PythonMessageCallBackInner); } -int PyRegisterMessageCallbackOrderly(void *consumer, PyObject *pCallback, object args){ +int PyRegisterMessageCallbackOrderly(void *consumer, PyObject *pCallback, object args) { CPushConsumer *consumerInner = (CPushConsumer *) consumer; g_CallBackMap[consumerInner] = make_pair(pCallback, std::move(args)); return RegisterMessageCallbackOrderly(consumerInner, &PythonMessageCallBackInner); @@ -418,6 +456,10 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { .value("E_LOG_LEVEL_TRACE", E_LOG_LEVEL_TRACE) .value("E_LOG_LEVEL_LEVEL_NUM", E_LOG_LEVEL_LEVEL_NUM); + enum_("TransactionStatus") + .value("E_COMMIT_TRANSACTION", E_COMMIT_TRANSACTION) + .value("E_ROLLBACK_TRANSACTION", E_ROLLBACK_TRANSACTION) + .value("E_UNKNOWN_TRANSACTION", E_UNKNOWN_TRANSACTION); //For Message def("CreateMessage", PyCreateMessage, return_value_policy()); @@ -445,6 +487,7 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { //For producer def("CreateProducer", PyCreateProducer, return_value_policy()); + def("CreateTransactionProducer", PyCreateTransactionProducer, return_value_policy()); def("DestroyProducer", PyDestroyProducer); def("StartProducer", PyStartProducer); def("ShutdownProducer", PyShutdownProducer); @@ -467,6 +510,7 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("SendMessageOneway", PySendMessageOneway); def("SendMessageOrderly", PySendMessageOrderly); def("SendMessageOrderlyByShardingKey", PySendMessageOrderlyByShardingKey); + def("SendMessageInTransaction", PySendMessageInTransaction); //For Consumer def("CreatePushConsumer", PyCreatePushConsumer, return_value_policy()); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 29a4952..ee7026e 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -106,6 +106,10 @@ const char *PyGetMessageId(PyMessageExt msgExt); //producer void *PyCreateProducer(const char *groupId); +CTransactionStatus PyLocalTransactionCheckerCallback(CProducer *producer, CMessageExt *msg, void *data); +CTransactionStatus PyLocalTransactionExecuteCallback(CProducer *producer, CMessage *msg, void *data); +void *PyCreateTransactionProducer(const char *groupId, PyObject *localTransactionCheckerCallback); + int PyDestroyProducer(void *producer); int PyStartProducer(void *producer); int PyShutdownProducer(void *producer); @@ -130,6 +134,7 @@ int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PySendResult PySendBatchMessage(void *producer, void *msg); PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector); PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey); +PySendResult PySendMessageInTransaction(void *producer , void *msg, PyObject *localTransactionExecuteCallback , void *args); int PyOrderlyCallbackInner(int size, CMessage *msg, void *args); From edad8caffe9e0456fca15872cb083667f703a910 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Sun, 13 Oct 2019 18:36:56 +0800 Subject: [PATCH 02/23] feat(producer) add multi transaction producer support --- src/PythonWrapper.cpp | 21 ++++++++++++++------- test/TestSendMessages.py | 32 +++++++++++++++++++++----------- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index cf310b9..6083df9 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -33,7 +33,8 @@ const char *VERSION = "PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " "; map> g_CallBackMap; -map g_TransactionCallBackMap; +map g_TransactionCheckCallBackMap; + class PyThreadStateLock { public: @@ -138,16 +139,22 @@ void *PyCreateProducer(const char *groupId) { void *PyCreateTransactionProducer(const char *groupId, PyObject *localTransactionCheckerCallback) { PyEval_InitThreads(); - CProducer *producer = CreateTransactionProducer(groupId, &PyLocalTransactionCheckerCallback, localTransactionCheckerCallback); - g_TransactionCallBackMap[producer] = localTransactionCheckerCallback; + CProducer *producer = CreateTransactionProducer(groupId, &PyLocalTransactionCheckerCallback, NULL); + g_TransactionCheckCallBackMap[producer] = localTransactionCheckerCallback; return producer; } CTransactionStatus PyLocalTransactionCheckerCallback(CProducer *producer, CMessageExt *msg, void *data) { - PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback - PyObject * checkerCallback = (PyObject *) data; - CTransactionStatus status = boost::python::call(checkerCallback, (void *) msg); - return status; + PyThreadStateLock pyThreadLock; // ensure hold GIL, before call python callback + PyMessageExt message = {.pMessageExt = msg}; + map::iterator iter; + iter = g_TransactionCheckCallBackMap.find(producer); + if (iter != g_TransactionCheckCallBackMap.end()) { + PyObject *pCallback = iter->second; + CTransactionStatus status = boost::python::call(pCallback, message); + return status; + } + return CTransactionStatus::E_UNKNOWN_TRANSACTION; } int PyDestroyProducer(void *producer) { diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py index 9baf78e..9ab8cf2 100644 --- a/test/TestSendMessages.py +++ b/test/TestSendMessages.py @@ -31,8 +31,18 @@ def init_producer(): StartProducer(producer) return producer +def transaction_local_checker(msg): + print 'begin check for msg: ' + PyGetMessageId(msg) + return TransactionStatus.E_COMMIT_TRANSACTION -producer = init_producer() +def init_transaction_producer(): + producer = CreateTransactionProducer('TransactionTestProducer', transaction_local_checker) + SetProducerLogLevel(producer, CLogLevel.E_LOG_LEVEL_INFO) + SetProducerNameServerAddress(producer, name_srv) + StartProducer(producer) + return producer + +producer = init_transaction_producer() tag = 'rmq-tag' key = 'rmq-key' @@ -257,24 +267,24 @@ def send_message_async_fail(msg, exception): print 'send message failed' print 'error msg: ' + exception.GetMsg() -def send_batch_message(batch_count): +def send_transaction_message(count): key = 'rmq-key' - print 'start send batch message' + print 'start send transaction message' tag = 'test' - batchMsg = CreateBatchMessage() - for n in range(count): body = 'hi rmq message, now is' + str(n) msg = CreateMessage(topic) SetMessageBody(msg, body) SetMessageKeys(msg, key) SetMessageTags(msg, tag) - AddMessage(batchMsg, msg) - DestroyMessage(msg) - SendBatchMessage(producer, batchMsg) - DestroyBatchMessage(batchMsg) - print 'send batch message done' + SendMessageInTransaction(producer, msg, transaction_local_execute, None) + print 'send transaction message done' + time.sleep(10000) + +def transaction_local_execute(msg, args): + print 'begin execute local transaction' + return TransactionStatus.E_UNKNOWN_TRANSACTION if __name__ == '__main__': - send_message_async(10) + send_transaction_message(10) From 0f58c739e8c076b145095d3dff01989d26c0fefd Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Sun, 13 Oct 2019 21:14:26 +0800 Subject: [PATCH 03/23] feat(producer) add transaction producer destroy logic --- src/PythonWrapper.cpp | 10 ++++++++++ src/PythonWrapper.h | 1 + test/TestSendMessages.py | 2 +- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index 6083df9..af80175 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -331,6 +331,15 @@ int PyDestroyPushConsumer(void *consumer) { } return DestroyPushConsumer(consumerInner); } +int PyDestroyTransactionProducer(void *producer) { + CProducer *producerInner = (CProducer *) producer; + map::iterator iter; + iter = g_TransactionCheckCallBackMap.find(producerInner); + if (iter != g_TransactionCheckCallBackMap.end()) { + g_TransactionCheckCallBackMap.erase(iter); + } + return DestroyProducer(producerInner); +} int PyStartPushConsumer(void *consumer) { return StartPushConsumer((CPushConsumer *) consumer); } @@ -496,6 +505,7 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("CreateProducer", PyCreateProducer, return_value_policy()); def("CreateTransactionProducer", PyCreateTransactionProducer, return_value_policy()); def("DestroyProducer", PyDestroyProducer); + def("DestroyTransactionProducer", PyDestroyTransactionProducer); def("StartProducer", PyStartProducer); def("ShutdownProducer", PyShutdownProducer); def("SetProducerNameServerAddress", PySetProducerNameServerAddress); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index ee7026e..8e8e27d 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -111,6 +111,7 @@ CTransactionStatus PyLocalTransactionExecuteCallback(CProducer *producer, CMessa void *PyCreateTransactionProducer(const char *groupId, PyObject *localTransactionCheckerCallback); int PyDestroyProducer(void *producer); +int PyDestroyTransactionProducer(void *producer); int PyStartProducer(void *producer); int PyShutdownProducer(void *producer); int PySetProducerNameServerAddress(void *producer, const char *namesrv); diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py index 9ab8cf2..179e1f1 100644 --- a/test/TestSendMessages.py +++ b/test/TestSendMessages.py @@ -32,7 +32,7 @@ def init_producer(): return producer def transaction_local_checker(msg): - print 'begin check for msg: ' + PyGetMessageId(msg) + print 'begin check for msg: ' + GetMessageId(msg) return TransactionStatus.E_COMMIT_TRANSACTION def init_transaction_producer(): From 0224a6b205ad9aaa395148c5bc58ead31de0ecc3 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Sun, 13 Oct 2019 21:30:19 +0800 Subject: [PATCH 04/23] feat(producer) remove batch message logic --- src/PythonWrapper.cpp | 30 ------------------------------ src/PythonWrapper.h | 7 ------- 2 files changed, 37 deletions(-) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index af80175..2783015 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -17,7 +17,6 @@ #include "CCommon.h" #include "CMessage.h" #include "CMessageExt.h" -#include "CBatchMessage.h" #include "CSendResult.h" #include "CProducer.h" #include "CPushConsumer.h" @@ -98,18 +97,6 @@ int PySetMessageDelayTimeLevel(void *msg, int level) { return SetDelayTimeLevel((CMessage *) msg, level); } -//batch message -void *PyCreateBatchMessage() { - return (void *) CreateBatchMessage(); -} - -int PyAddMessage(void *batchMsg, void *msg) { - return AddMessage((CBatchMessage *) batchMsg, (CMessage *) msg); -} - -int PyDestroyBatchMessage(void *batchMsg) { - return DestroyBatchMessage((CBatchMessage *) batchMsg); -} //messageExt const char *PyGetMessageTopic(PyMessageExt msgExt) { @@ -250,17 +237,6 @@ int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, (void *) pyCallback); } -PySendResult PySendBatchMessage(void *producer, void *batchMessage) { - PySendResult ret; - CSendResult result; - SendBatchMessage((CProducer *) producer, (CBatchMessage *) batchMessage, &result); - ret.sendStatus = result.sendStatus; - ret.offset = result.offset; - strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); - ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; - return ret; -} - PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) { PySendResult ret; @@ -488,11 +464,6 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("SetMessageProperty", PySetMessageProperty); def("SetDelayTimeLevel", PySetMessageDelayTimeLevel); - //For batch message - def("CreateBatchMessage", PyCreateBatchMessage, return_value_policy()); - def("AddMessage", PyAddMessage); - def("DestroyBatchMessage", PyDestroyBatchMessage); - //For MessageExt def("GetMessageTopic", PyGetMessageTopic); def("GetMessageTags", PyGetMessageTags); @@ -522,7 +493,6 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("SendMessageSync", PySendMessageSync); def("SendMessageAsync", PySendMessageAsync); - def("SendBatchMessage", PySendBatchMessage); def("SendMessageOneway", PySendMessageOneway); def("SendMessageOrderly", PySendMessageOrderly); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 8e8e27d..732320b 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -18,7 +18,6 @@ #include "CCommon.h" #include "CMessage.h" #include "CMessageExt.h" -#include "CBatchMessage.h" #include "CSendResult.h" #include "CProducer.h" #include "CPushConsumer.h" @@ -91,11 +90,6 @@ int PySetByteMessageBody(void *msg, const char *body, int len); int PySetMessageProperty(void *msg, const char *key, const char *value); int PySetMessageDelayTimeLevel(void *msg, int level); -//batch message -void *PyCreateBatchMessage(); -int PyAddMessage(void *batchMsg, void *msg); -int PyDestroyBatchMessage(void *batchMsg); - //messageExt const char *PyGetMessageTopic(PyMessageExt msgExt); const char *PyGetMessageTags(PyMessageExt msgExt); @@ -132,7 +126,6 @@ void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback); void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback); int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback); -PySendResult PySendBatchMessage(void *producer, void *msg); PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector); PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey); PySendResult PySendMessageInTransaction(void *producer , void *msg, PyObject *localTransactionExecuteCallback , void *args); From 0f74f8fbc2c0c7aa01c1e1e60d0ddd1fd872e04a Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Wed, 23 Oct 2019 19:16:33 +0800 Subject: [PATCH 05/23] feat(producer) add specified order producer --- src/PythonWrapper.cpp | 6 ++++++ src/PythonWrapper.h | 1 + 2 files changed, 7 insertions(+) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index 2783015..5fb1d53 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -124,6 +124,11 @@ void *PyCreateProducer(const char *groupId) { return (void *) CreateProducer(groupId); } +void *PyCreateOrderProducer(const char *groupId) { + PyEval_InitThreads(); // ensure create GIL, for call Python callback from C. + return (void *) CreateOrderlyProducer(groupId); +} + void *PyCreateTransactionProducer(const char *groupId, PyObject *localTransactionCheckerCallback) { PyEval_InitThreads(); CProducer *producer = CreateTransactionProducer(groupId, &PyLocalTransactionCheckerCallback, NULL); @@ -474,6 +479,7 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { //For producer def("CreateProducer", PyCreateProducer, return_value_policy()); + def("CreateOrderProducer", PyCreateOrderProducer, return_value_policy()); def("CreateTransactionProducer", PyCreateTransactionProducer, return_value_policy()); def("DestroyProducer", PyDestroyProducer); def("DestroyTransactionProducer", PyDestroyTransactionProducer); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 732320b..8fdb04f 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -100,6 +100,7 @@ const char *PyGetMessageId(PyMessageExt msgExt); //producer void *PyCreateProducer(const char *groupId); +void *PyCreateOrderProducer(const char *groupId); CTransactionStatus PyLocalTransactionCheckerCallback(CProducer *producer, CMessageExt *msg, void *data); CTransactionStatus PyLocalTransactionExecuteCallback(CProducer *producer, CMessage *msg, void *data); void *PyCreateTransactionProducer(const char *groupId, PyObject *localTransactionCheckerCallback); From e8e43cc36cca4d411b2751f531e5527700a717b2 Mon Sep 17 00:00:00 2001 From: dinglei Date: Mon, 4 Nov 2019 15:23:24 +0800 Subject: [PATCH 06/23] fix(Readme): use master svg in ci build result --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f52295c..3b1c67b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ ## RocketMQ Client Python [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) -[![TravisCI](https://travis-ci.org/apache/rocketmq-client-python.svg)](https://travis-ci.org/apache/rocketmq-client-python) +[![TravisCI](https://travis-ci.org/apache/rocketmq-client-python.svg?branch=master)](https://travis-ci.org/apache/rocketmq-client-python) * RocketMQ Python client is developed on top of [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), which has been proven robust and widely adopted within Alibaba Group by many business units for more than three years. ---------- From 08bc79c9fcd1c851433f6ad4fd44abac00c0d1ba Mon Sep 17 00:00:00 2001 From: dinglei Date: Wed, 20 Nov 2019 14:52:44 +0800 Subject: [PATCH 07/23] doc(readme): use cpp 1.2.4 for quick install doc(readme): use CPP 1.2.4 for quick install --- doc/Introduction.md | 34 +++++++--------------------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/doc/Introduction.md b/doc/Introduction.md index 64863e9..c0f97da 100644 --- a/doc/Introduction.md +++ b/doc/Introduction.md @@ -36,20 +36,9 @@ ``` 4. [librocketmq](https://github.com/apache/rocketmq-client-cpp), choose one method below: - - make and install the RocketMQ library manually from [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp) + - build from source: [Build and Install](https://github.com/apache/rocketmq-client-cpp/tree/master#build-and-install) - - quick install, please choose the suitable dynamic library version for your system. - ``` - mkdir rocketmqlib - cd rocketmqlib - wget https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com/cpp-client/linux/1.2.2/RHEL7.X/rocketmq-client-cpp.tar.gz - tar -xzf rocketmq-client-cpp.tar.gz - cd rocketmq-client-cpp - sudo cp lib/librocketmq.so lib/librocketmq.a /usr/local/lib/ - mkdir -p /usr/local/include/rocketmq - sudo cp -r include/* /usr/local/include/rocketmq - ``` - + - download specific release: [rocketmq-client-cpp](https://www.apache.org/dyn/closer.cgi?path=rocketmq/rocketmq-client-cpp/1.2.4/rocketmq-client-cpp-1.2.4-bin-release.tar.gz) and unzip the package, please choose the right version according to your OS and unzip it, then copy the library files to to your `/usr/local/lib/` directory and copy the head files under `include` path to `/usr/local/include/rocketmq/`. and please make sure your `/usr/local/lib/` directory is under the `LD_LIBRARY_PATH`. * Make and install module manually 1. Using Dynamic RocketMQ and boost python libraries are recommended. @@ -90,13 +79,13 @@ ``` 4. [librocketmq](https://github.com/apache/rocketmq-client-cpp), choose one method below: - - make and install the RocketMQ library manually from [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp) + - build from source: [Build and Install](https://github.com/apache/rocketmq-client-cpp/tree/master#build-and-install) - - quick install + - quick install: there are no cpp binary release for macos, below script can only be used for dev env. ``` mkdir rocketmqlib cd rocketmqlib - wget https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com/cpp-client/mac/1.2.0/librocketmq.tar.gz + wget https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com/cpp-client/mac/1.2.4/librocketmq.tar.gz tar -xzf librocketmq.tar.gz cp librocketmq.dylib librocketmq.a /usr/local/lib/ cp -r rocketmq /usr/local/include/ @@ -104,21 +93,12 @@ * Make and install module manually - 1. Using Dynamic RocketMQ and boost python libraries are recommended. - ``` + ``` - mkdir build && cd build - cmake ../ -DBoost_USE_STATIC_LIBS=OFF -DROCKETMQ_USE_STATIC_LIBS=OFF - make - make install - ``` - - 2. Also you can using static libraries. - ``` - - mkdir build & cd build - - cmake ../ -DBoost_USE_STATIC_LIBS=ON -DROCKETMQ_USE_STATIC_LIBS=ON - - make - - make install - ``` + ``` * Check verion ``` strings librocketmqclientpython.so |grep PYTHON_CLIENT_VERSION From 09197815c69c32b73f98261c3a229f3c20381aea Mon Sep 17 00:00:00 2001 From: dinglei Date: Thu, 21 Nov 2019 21:58:07 +0800 Subject: [PATCH 08/23] doc(readme): add apache license v2 link. doc(readme): add apache license v2 link. --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 15d2c2f..d06ee59 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # rocketmq-client-python +[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) [![Build Status](https://travis-ci.org/apache/rocketmq-client-python.svg?branch=master)](https://travis-ci.org/apache/rocketmq-client-python) [![codecov](https://codecov.io/gh/apache/rocketmq-client-python/branch/ctypes/graph/badge.svg)](https://codecov.io/gh/apache/rocketmq-client-python/branch/ctypes) [![PyPI](https://img.shields.io/pypi/v/rocketmq-client-python.svg)](https://pypi.org/project/rocketmq-client-python) From 9990413d7b6b7cb665bac700095bb2bab767e66e Mon Sep 17 00:00:00 2001 From: dinglei Date: Thu, 21 Nov 2019 22:02:35 +0800 Subject: [PATCH 09/23] doc(notice): update copyright year to 2019. doc(notice): update copyright year to 2019. --- NOTICE | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/NOTICE b/NOTICE index 703c28b..85e2dc3 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ Apache RocketMQ -Copyright 2016-2018 The Apache Software Foundation +Copyright 2016-2019 The Apache Software Foundation This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file +The Apache Software Foundation (http://www.apache.org/). From 2eb90cf0eda63e17cae46f82aaaad89176553836 Mon Sep 17 00:00:00 2001 From: dinglei Date: Mon, 9 Dec 2019 21:17:02 +0800 Subject: [PATCH 10/23] fix(travisci): update rocketmq download link to apache archive server (#79) fix(travisci): update rocketmq download link to apache archive server (#79) --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 144215b..3108b88 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,7 +39,7 @@ matrix: before_script: - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi - - wget http://us.mirrors.quenda.co/apache/rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip + - wget https://archive.apache.org/dist/rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip - unzip rocketmq-all-4.5.2-bin-release.zip - cd rocketmq-all-4.5.2-bin-release - perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh From 62f90483556e6c14cf56d792a74b71fcbe4d382d Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 17 Dec 2019 20:14:01 +0800 Subject: [PATCH 11/23] feat(version): update version to 2.0.0 to release --- .bumpversion.cfg | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 8ce077d..f9c7db4 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -18,5 +18,5 @@ files = setup.py commit = True tag = True -current_version = 0.5.0-rc3 +current_version = 2.0.0 diff --git a/setup.py b/setup.py index 72952b2..074aeb6 100755 --- a/setup.py +++ b/setup.py @@ -64,7 +64,7 @@ def finalize_options(self): setup( name='rocketmq-client-python', - version='0.5.0-rc3', + version='2.0.0', author='apache.rocketmq', author_email='dev@rocketmq.apache.org', packages=find_packages(exclude=('tests', 'tests.*')), From 6bf0e60c6d06a13610c39762bea762d025388ef9 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 17 Dec 2019 21:31:23 +0800 Subject: [PATCH 12/23] feat(version): update version to next release. --- .bumpversion.cfg | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index f9c7db4..9c0115f 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -18,5 +18,5 @@ files = setup.py commit = True tag = True -current_version = 2.0.0 +current_version = 2.0.0rc1 diff --git a/setup.py b/setup.py index 074aeb6..7bdcf81 100755 --- a/setup.py +++ b/setup.py @@ -64,7 +64,7 @@ def finalize_options(self): setup( name='rocketmq-client-python', - version='2.0.0', + version='2.0.1rc1', author='apache.rocketmq', author_email='dev@rocketmq.apache.org', packages=find_packages(exclude=('tests', 'tests.*')), From a3f132b27e55bfa463445b9c868146d3328d6814 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 17 Dec 2019 21:31:56 +0800 Subject: [PATCH 13/23] feat(version): update version to next release. --- .bumpversion.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 9c0115f..f6d6051 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -18,5 +18,5 @@ files = setup.py commit = True tag = True -current_version = 2.0.0rc1 +current_version = 2.0.1rc1 From 3fb373d0c3ac7cdd040150993441cfd5895afdae Mon Sep 17 00:00:00 2001 From: dinglei Date: Fri, 20 Dec 2019 11:47:44 +0800 Subject: [PATCH 14/23] doc(readme): update prerequisites install. doc(readme): update prerequisites install. --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index d06ee59..d0fcfe7 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,27 @@ [![PyPI](https://img.shields.io/pypi/v/rocketmq-client-python.svg)](https://pypi.org/project/rocketmq-client-python) RocketMQ Python client, based on [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), supports Linux and macOS +## Prerequisites + +### Install `librocketmq` +rocketmq-client-python is a lightweight wrapper around [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), so you need install +`librocketmq` first. + +#### include file +```bash +git clone https://github.com/apache/rocketmq-client-cpp + +# By default, CFLAGS contains `/usr/local/include` +sudo mkdir -p /usr/local/include/rocketmq/ +sudo cp rocketmq-client-cpp/include/* /usr/local/include/rocketmq +``` + +#### binary library +your could download directly or build manually: + +- download specific release: [rocketmq-client-cpp](https://archive.apache.org/dist/rocketmq/rocketmq-client-cpp/1.2.4/rocketmq-client-cpp-1.2.4-bin-release.tar.gz) + and unzip the package, please choose the right version according to your OS and unzip it, then copy the library files to to your `LD_LIBRARY_PATH` directory(as default, `/usr/local/lib/` works finely). +- build from source: [Build and Install](https://github.com/apache/rocketmq-client-cpp/tree/master#build-and-install) ## Installation From f01c884fe1f0902129dce59e24cb8bdcddcdc158 Mon Sep 17 00:00:00 2001 From: dinglei Date: Wed, 8 Jan 2020 13:08:06 +0800 Subject: [PATCH 15/23] fix(doc): update notice and open statistic label (#83) --- NOTICE | 2 +- README.md | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/NOTICE b/NOTICE index 85e2dc3..65ebdd0 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ Apache RocketMQ -Copyright 2016-2019 The Apache Software Foundation +Copyright 2016-2020 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff --git a/README.md b/README.md index d0fcfe7..835847e 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,10 @@ [![Build Status](https://travis-ci.org/apache/rocketmq-client-python.svg?branch=master)](https://travis-ci.org/apache/rocketmq-client-python) [![codecov](https://codecov.io/gh/apache/rocketmq-client-python/branch/ctypes/graph/badge.svg)](https://codecov.io/gh/apache/rocketmq-client-python/branch/ctypes) [![PyPI](https://img.shields.io/pypi/v/rocketmq-client-python.svg)](https://pypi.org/project/rocketmq-client-python) +[![GitHub release](https://img.shields.io/badge/release-download-default.svg)](https://github.com/apache/rocketmq-client-python/releases) +[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-client-python.svg)](http://isitmaintained.com/project/apache/rocketmq-client-python "Average time to resolve an issue") +[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-client-python.svg)](http://isitmaintained.com/project/apache/rocketmq-client-python "Percentage of issues still open") +![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social) RocketMQ Python client, based on [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), supports Linux and macOS ## Prerequisites From 44824e4c20a04dbe0ab18adb499998c5c9156b74 Mon Sep 17 00:00:00 2001 From: dinglei Date: Fri, 14 Feb 2020 18:18:19 +0800 Subject: [PATCH 16/23] feat(doc): update cpp library install methods. (#84) * feat(doc): update cpp library install methods. * feat(doc): update cpp library install methods. --- README.md | 41 ++++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 835847e..f59d545 100644 --- a/README.md +++ b/README.md @@ -16,21 +16,32 @@ RocketMQ Python client, based on [rocketmq-client-cpp](https://github.com/apache rocketmq-client-python is a lightweight wrapper around [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), so you need install `librocketmq` first. -#### include file -```bash -git clone https://github.com/apache/rocketmq-client-cpp - -# By default, CFLAGS contains `/usr/local/include` -sudo mkdir -p /usr/local/include/rocketmq/ -sudo cp rocketmq-client-cpp/include/* /usr/local/include/rocketmq -``` - -#### binary library -your could download directly or build manually: - -- download specific release: [rocketmq-client-cpp](https://archive.apache.org/dist/rocketmq/rocketmq-client-cpp/1.2.4/rocketmq-client-cpp-1.2.4-bin-release.tar.gz) - and unzip the package, please choose the right version according to your OS and unzip it, then copy the library files to to your `LD_LIBRARY_PATH` directory(as default, `/usr/local/lib/` works finely). -- build from source: [Build and Install](https://github.com/apache/rocketmq-client-cpp/tree/master#build-and-install) +#### Download by binary release. +download specific release according you OS: [rocketmq-client-cpp-2.0.0](https://github.com/apache/rocketmq-client-cpp/releases/tag/2.0.0) +- centos + + take centos7 as example, you can install the library in centos6 by the same method. + ```bash + wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm + sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm + ``` +- debian + ```bash + wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb + sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb + ``` +- macOS + ```bash + wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-bin-release.darwin.tar.gz + tar -xzf rocketmq-client-cpp-2.0.0-bin-release.darwin.tar.gz + cd rocketmq-client-cpp + mkdir /usr/local/include/rocketmq + cp include/* /usr/local/include/rocketmq + cp lib/* /usr/local/lib + install_name_tool -id "@rpath/librocketmq.dylib" /usr/local/lib/librocketmq.dylib + ``` +#### Build from source +you can also build it manually from source according to [Build and Install](https://github.com/apache/rocketmq-client-cpp/tree/master#build-and-install) ## Installation From 7e9ffd91c3f94a9e2ab3971ced6fb329a0c7e6ec Mon Sep 17 00:00:00 2001 From: Lin Date: Fri, 8 May 2020 14:38:11 +0800 Subject: [PATCH 17/23] [ISSUE #88]Sample Callback should return ConsumeStatus (#90) --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f59d545..2140022 100644 --- a/README.md +++ b/README.md @@ -74,11 +74,12 @@ producer.shutdown() ```python import time -from rocketmq.client import PushConsumer +from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body) + return ConsumeStatus.CONSUME_SUCCESS consumer = PushConsumer('CID_XXX') From 9a38c4314abbc290396c75de28054898214c3d3f Mon Sep 17 00:00:00 2001 From: sicklife Date: Wed, 29 Jul 2020 22:12:57 +0800 Subject: [PATCH 18/23] add set_instance_name function for producer --- rocketmq/client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rocketmq/client.py b/rocketmq/client.py index 7d2ce0c..106e321 100644 --- a/rocketmq/client.py +++ b/rocketmq/client.py @@ -230,6 +230,9 @@ def send_orderly_with_sharding_key(self, msg, sharding_key): def set_group(self, group_name): ffi_check(dll.SetProducerGroupName(self._handle, _to_bytes(group_name))) + def set_instance_name(self, name): + ffi_check(dll.SetProducerInstanceName(self._handle, _to_bytes(name))) + def set_name_server_address(self, addr): ffi_check(dll.SetProducerNameServerAddress(self._handle, _to_bytes(addr))) From 58397dc86d85d1b16a4d949c430e7499722050b5 Mon Sep 17 00:00:00 2001 From: ziyht Date: Tue, 13 Oct 2020 04:09:40 +0800 Subject: [PATCH 19/23] [ISSUE #96] Fix the memory leak in Message (#98) Message created by cpp library not released in class Message --- rocketmq/client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rocketmq/client.py b/rocketmq/client.py index 106e321..88ae128 100644 --- a/rocketmq/client.py +++ b/rocketmq/client.py @@ -71,6 +71,9 @@ def _to_bytes(s): class Message(object): def __init__(self, topic): self._handle = dll.CreateMessage(_to_bytes(topic)) + + def __del__(self): + dll.DestroyMessage(self._handle) def set_keys(self, keys): ffi_check(dll.SetMessageKeys(self._handle, _to_bytes(keys))) From 89ad6bde4351ecafa582770a3826ac9c660134b9 Mon Sep 17 00:00:00 2001 From: tangzhongrui Date: Sat, 17 Oct 2020 10:02:49 +0800 Subject: [PATCH 20/23] Add a function which shows how to use rocketmq in multi-threaded scenarios properly to handle exception such as Namer Server Cluster and Broker Cluster restart --- samples/producer.py | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/samples/producer.py b/samples/producer.py index fb90b7b..f69534c 100644 --- a/samples/producer.py +++ b/samples/producer.py @@ -19,11 +19,12 @@ from rocketmq.client import Producer, Message, TransactionMQProducer, TransactionStatus import time +import threading topic = 'TopicTest' gid = 'test' name_srv = '127.0.0.1:9876' - +MUTEX = threading.Lock() def create_message(): msg = Message(topic) @@ -46,6 +47,38 @@ def send_message_sync(count): producer.shutdown() +def send_message_multi_threaded(retry_time): + producer = Producer(gid) + producer.set_name_server_address(name_srv) + msg = create_message() + + global MUTEX + MUTEX.acquire() + try: + producer.start() + except Exception as e: + print('ProducerStartFailed:', e) + MUTEX.release() + return + + try: + for i in range(retry_time): + ret = producer.send_sync(msg) + if ret.status == 0: + print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) + break + else: + print('send message to MQ failed.') + if i == (retry_time - 1): + print('send message to MQ failed after retries.') + except Exception as e: + print('ProducerSendSyncFailed:', e) + finally: + producer.shutdown() + MUTEX.release() + return + + def send_orderly_with_sharding_key(count): producer = Producer(gid, True) producer.set_name_server_address(name_srv) From d074515b0104354d1b54ad53a110e49413f8d889 Mon Sep 17 00:00:00 2001 From: messense Date: Tue, 1 Dec 2020 16:56:03 +0800 Subject: [PATCH 21/23] Add a multi-thread producer test case (#102) --- dev-requirements.txt | 1 + tests/test_producer.py | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/dev-requirements.txt b/dev-requirements.txt index 08d8b97..c96c923 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -17,3 +17,4 @@ pytest pytest-timeout pytest-faulthandler pytest-cov +futures; python_version < '3' diff --git a/tests/test_producer.py b/tests/test_producer.py index ee71ac8..311bf71 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -16,6 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from concurrent.futures import ThreadPoolExecutor import time import threading import sys @@ -34,6 +35,16 @@ def test_producer_send_sync(producer): assert ret.status == SendStatus.OK +def test_producer_send_sync_multi_thread(producer): + executor = ThreadPoolExecutor(max_workers=5) + futures = [] + for _ in range(5): + futures.append(executor.submit(test_producer_send_sync, producer)) + + for future in futures: + _ret = future.result() + + def test_producer_send_oneway(producer): msg = Message('test') msg.set_keys('send_oneway') From 71363a7d50cbdc50e2fcd62e2fd2ccc02018c0fb Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 20 Mar 2024 20:22:20 +0800 Subject: [PATCH 22/23] support trace (#136) --- rocketmq/client.py | 11 ++++++++++- rocketmq/ffi.py | 10 ++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/rocketmq/client.py b/rocketmq/client.py index 88ae128..fecac9c 100644 --- a/rocketmq/client.py +++ b/rocketmq/client.py @@ -23,7 +23,7 @@ from .ffi import ( dll, _CSendResult, MSG_CALLBACK_FUNC, MessageModel, TRANSACTION_CHECK_CALLBACK, - LOCAL_TRANSACTION_EXECUTE_CALLBACK + LOCAL_TRANSACTION_EXECUTE_CALLBACK, TraceModel ) from .exceptions import ( ffi_check, NullPointerException, @@ -259,6 +259,9 @@ def set_compress_level(self, level): def set_max_message_size(self, max_size): ffi_check(dll.SetProducerMaxMessageSize(self._handle, max_size)) + def set_message_trace(self, message_trace): + ffi_check(dll.SetProducerMessageTrace(self._handle, message_trace and TraceModel.OPEN or TraceModel.CLOSE)) + def start(self): ffi_check(dll.StartProducer(self._handle)) @@ -311,6 +314,9 @@ def __exit__(self, exec_type, value, traceback): def set_name_server_address(self, addr): ffi_check(dll.SetProducerNameServerAddress(self._handle, _to_bytes(addr))) + def set_message_trace(self, message_trace): + ffi_check(dll.SetProducerMessageTrace(self._handle, message_trace and TraceModel.OPEN or TraceModel.CLOSE)) + def start(self): ffi_check(dll.StartProducer(self._handle)) @@ -437,3 +443,6 @@ def set_message_batch_max_size(self, max_size): def set_instance_name(self, name): ffi_check(dll.SetPushConsumerInstanceName(self._handle, _to_bytes(name))) + + def set_message_trace(self, message_trace): + ffi_check(dll.SetPushConsumerMessageTrace(self._handle, message_trace and TraceModel.OPEN or TraceModel.CLOSE)) diff --git a/rocketmq/ffi.py b/rocketmq/ffi.py index 21a01df..b4f6a11 100644 --- a/rocketmq/ffi.py +++ b/rocketmq/ffi.py @@ -81,6 +81,11 @@ class MessageModel(CtypesEnum): CLUSTERING = 1 +class TraceModel(CtypesEnum): + OPEN = 0 + CLOSE = 1 + + class _CSendResult(Structure): _fields_ = [ ('sendStatus', c_int), @@ -200,6 +205,8 @@ class _CMQException(Structure): dll.SetProducerCompressLevel.restype = _CStatus dll.SetProducerMaxMessageSize.argtypes = [c_void_p, c_int] dll.SetProducerMaxMessageSize.restype = _CStatus +dll.SetProducerMessageTrace.argtypes = [c_void_p, TraceModel] +dll.SetProducerMessageTrace.restype = _CStatus dll.SendMessageSync.argtypes = [c_void_p, c_void_p, POINTER(_CSendResult)] dll.SendMessageSync.restype = _CStatus dll.SendMessageOneway.argtypes = [c_void_p, c_void_p] @@ -261,6 +268,9 @@ class _CMQException(Structure): dll.SetPushConsumerLogLevel.restype = _CStatus dll.SetPushConsumerMessageModel.argtypes = [c_void_p, MessageModel] dll.SetPushConsumerMessageModel.restype = _CStatus +dll.SetPushConsumerLogLevel.restype = _CStatus +dll.SetPushConsumerMessageTrace.argtypes = [c_void_p, TraceModel] +dll.SetPushConsumerMessageTrace.restype = _CStatus # Misc dll.GetLatestErrorMessage.argtypes = [] From 143af507040d30173908b96b81f338e34e1f81e9 Mon Sep 17 00:00:00 2001 From: hexueyuan Date: Thu, 18 Dec 2025 14:48:36 +0800 Subject: [PATCH 23/23] Support ssl. (#157) * Support ssl. * Added compatibility code for the SSL interface. --------- Co-authored-by: hexueyuan --- rocketmq/client.py | 14 ++++++++++++++ rocketmq/ffi.py | 14 ++++++++++++++ samples/consumer.py | 4 +++- samples/producer.py | 17 +++++++++++++++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/rocketmq/client.py b/rocketmq/client.py index fecac9c..bb8e9d4 100644 --- a/rocketmq/client.py +++ b/rocketmq/client.py @@ -262,6 +262,13 @@ def set_max_message_size(self, max_size): def set_message_trace(self, message_trace): ffi_check(dll.SetProducerMessageTrace(self._handle, message_trace and TraceModel.OPEN or TraceModel.CLOSE)) + def set_ssl_enable(self, enable): + ssl_enable_code = 1 if enable else 0 + ffi_check(dll.SetProducerSsl(self._handle, ssl_enable_code)) + + def set_ssl_property_file(self, file_path): + ffi_check(dll.SetProducerSslPropertyFile(self._handle, _to_bytes(file_path))) + def start(self): ffi_check(dll.StartProducer(self._handle)) @@ -401,6 +408,13 @@ def set_session_credentials(self, access_key, access_secret, channel): _to_bytes(channel) )) + def set_ssl_enable(self, enable): + ssl_enable_code = 1 if enable else 0 + ffi_check(dll.SetPushConsumerSsl(self._handle, ssl_enable_code)) + + def set_ssl_property_file(self, file_path): + ffi_check(dll.SetPushConsumerSslPropertyFile(self._handle, _to_bytes(file_path))) + def subscribe(self, topic, callback, expression='*'): def _on_message(consumer, msg): exc = None diff --git a/rocketmq/ffi.py b/rocketmq/ffi.py index b4f6a11..0eca2ca 100644 --- a/rocketmq/ffi.py +++ b/rocketmq/ffi.py @@ -207,6 +207,13 @@ class _CMQException(Structure): dll.SetProducerMaxMessageSize.restype = _CStatus dll.SetProducerMessageTrace.argtypes = [c_void_p, TraceModel] dll.SetProducerMessageTrace.restype = _CStatus +try: + dll.SetProducerSsl.argtypes = [c_void_p, c_int] + dll.SetProducerSsl.restype = _CStatus + dll.SetProducerSslPropertyFile.argtypes = [c_void_p, c_char_p] + dll.SetProducerSslPropertyFile.restype = _CStatus +except AttributeError: + pass dll.SendMessageSync.argtypes = [c_void_p, c_void_p, POINTER(_CSendResult)] dll.SendMessageSync.restype = _CStatus dll.SendMessageOneway.argtypes = [c_void_p, c_void_p] @@ -271,6 +278,13 @@ class _CMQException(Structure): dll.SetPushConsumerLogLevel.restype = _CStatus dll.SetPushConsumerMessageTrace.argtypes = [c_void_p, TraceModel] dll.SetPushConsumerMessageTrace.restype = _CStatus +try: + dll.SetPushConsumerSsl.argtypes = [c_void_p, c_int] + dll.SetPushConsumerSsl.restype = _CStatus + dll.SetPushConsumerSslPropertyFile.argtypes = [c_void_p, c_char_p] + dll.SetPushConsumerSslPropertyFile.restype = _CStatus +except AttributeError: + pass # Misc dll.GetLatestErrorMessage.argtypes = [] diff --git a/samples/consumer.py b/samples/consumer.py index b95da79..bf36c4a 100644 --- a/samples/consumer.py +++ b/samples/consumer.py @@ -27,7 +27,9 @@ def callback(msg): def start_consume_message(): consumer = PushConsumer('consumer_group') consumer.set_name_server_address('127.0.0.1:9876') - consumer.subscribe('TopicTest', callback) + consumer.subscribe('BenchmarkTest', callback) + # consumer.set_ssl_enable(True) + # consumer.set_ssl_property_file("/etc/rocketmq/tls.properties") print ('start consume message') consumer.start() diff --git a/samples/producer.py b/samples/producer.py index f69534c..4869b77 100644 --- a/samples/producer.py +++ b/samples/producer.py @@ -115,6 +115,23 @@ def send_transaction_message(count): time.sleep(3600) +def send_message_with_ssl(count): + producer = Producer(gid) + producer.set_name_server_address(name_srv) + producer.set_ssl_enable(True) + producer.set_ssl_property_file("/etc/rocketmq/tls.properties") + producer.start() + for n in range(count): + msg = create_message() + producer.start() + for n in range(count): + msg = create_message() + ret = producer.send_sync(msg) + print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) + print ('send sync message done') + producer.shutdown() + + if __name__ == '__main__': send_message_sync(10)