diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 8ce077d..f6d6051 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -18,5 +18,5 @@ files = setup.py commit = True tag = True -current_version = 0.5.0-rc3 +current_version = 2.0.1rc1 diff --git a/.travis.yml b/.travis.yml index 144215b..3108b88 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,7 +39,7 @@ matrix: before_script: - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi - - wget http://us.mirrors.quenda.co/apache/rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip + - wget https://archive.apache.org/dist/rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip - unzip rocketmq-all-4.5.2-bin-release.zip - cd rocketmq-all-4.5.2-bin-release - perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh diff --git a/NOTICE b/NOTICE index 703c28b..65ebdd0 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ Apache RocketMQ -Copyright 2016-2018 The Apache Software Foundation +Copyright 2016-2020 The Apache Software Foundation This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file +The Apache Software Foundation (http://www.apache.org/). diff --git a/README.md b/README.md index 15d2c2f..2140022 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,47 @@ # rocketmq-client-python +[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) [![Build Status](https://travis-ci.org/apache/rocketmq-client-python.svg?branch=master)](https://travis-ci.org/apache/rocketmq-client-python) [![codecov](https://codecov.io/gh/apache/rocketmq-client-python/branch/ctypes/graph/badge.svg)](https://codecov.io/gh/apache/rocketmq-client-python/branch/ctypes) [![PyPI](https://img.shields.io/pypi/v/rocketmq-client-python.svg)](https://pypi.org/project/rocketmq-client-python) +[![GitHub release](https://img.shields.io/badge/release-download-default.svg)](https://github.com/apache/rocketmq-client-python/releases) +[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-client-python.svg)](http://isitmaintained.com/project/apache/rocketmq-client-python "Average time to resolve an issue") +[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-client-python.svg)](http://isitmaintained.com/project/apache/rocketmq-client-python "Percentage of issues still open") +![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social) RocketMQ Python client, based on [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), supports Linux and macOS +## Prerequisites + +### Install `librocketmq` +rocketmq-client-python is a lightweight wrapper around [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), so you need install +`librocketmq` first. + +#### Download by binary release. +download specific release according you OS: [rocketmq-client-cpp-2.0.0](https://github.com/apache/rocketmq-client-cpp/releases/tag/2.0.0) +- centos + + take centos7 as example, you can install the library in centos6 by the same method. + ```bash + wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm + sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm + ``` +- debian + ```bash + wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb + sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb + ``` +- macOS + ```bash + wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-bin-release.darwin.tar.gz + tar -xzf rocketmq-client-cpp-2.0.0-bin-release.darwin.tar.gz + cd rocketmq-client-cpp + mkdir /usr/local/include/rocketmq + cp include/* /usr/local/include/rocketmq + cp lib/* /usr/local/lib + install_name_tool -id "@rpath/librocketmq.dylib" /usr/local/lib/librocketmq.dylib + ``` +#### Build from source +you can also build it manually from source according to [Build and Install](https://github.com/apache/rocketmq-client-cpp/tree/master#build-and-install) ## Installation @@ -37,11 +74,12 @@ producer.shutdown() ```python import time -from rocketmq.client import PushConsumer +from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body) + return ConsumeStatus.CONSUME_SUCCESS consumer = PushConsumer('CID_XXX') diff --git a/dev-requirements.txt b/dev-requirements.txt index 08d8b97..c96c923 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -17,3 +17,4 @@ pytest pytest-timeout pytest-faulthandler pytest-cov +futures; python_version < '3' diff --git a/rocketmq/client.py b/rocketmq/client.py index 7d2ce0c..bb8e9d4 100644 --- a/rocketmq/client.py +++ b/rocketmq/client.py @@ -23,7 +23,7 @@ from .ffi import ( dll, _CSendResult, MSG_CALLBACK_FUNC, MessageModel, TRANSACTION_CHECK_CALLBACK, - LOCAL_TRANSACTION_EXECUTE_CALLBACK + LOCAL_TRANSACTION_EXECUTE_CALLBACK, TraceModel ) from .exceptions import ( ffi_check, NullPointerException, @@ -71,6 +71,9 @@ def _to_bytes(s): class Message(object): def __init__(self, topic): self._handle = dll.CreateMessage(_to_bytes(topic)) + + def __del__(self): + dll.DestroyMessage(self._handle) def set_keys(self, keys): ffi_check(dll.SetMessageKeys(self._handle, _to_bytes(keys))) @@ -230,6 +233,9 @@ def send_orderly_with_sharding_key(self, msg, sharding_key): def set_group(self, group_name): ffi_check(dll.SetProducerGroupName(self._handle, _to_bytes(group_name))) + def set_instance_name(self, name): + ffi_check(dll.SetProducerInstanceName(self._handle, _to_bytes(name))) + def set_name_server_address(self, addr): ffi_check(dll.SetProducerNameServerAddress(self._handle, _to_bytes(addr))) @@ -253,6 +259,16 @@ def set_compress_level(self, level): def set_max_message_size(self, max_size): ffi_check(dll.SetProducerMaxMessageSize(self._handle, max_size)) + def set_message_trace(self, message_trace): + ffi_check(dll.SetProducerMessageTrace(self._handle, message_trace and TraceModel.OPEN or TraceModel.CLOSE)) + + def set_ssl_enable(self, enable): + ssl_enable_code = 1 if enable else 0 + ffi_check(dll.SetProducerSsl(self._handle, ssl_enable_code)) + + def set_ssl_property_file(self, file_path): + ffi_check(dll.SetProducerSslPropertyFile(self._handle, _to_bytes(file_path))) + def start(self): ffi_check(dll.StartProducer(self._handle)) @@ -305,6 +321,9 @@ def __exit__(self, exec_type, value, traceback): def set_name_server_address(self, addr): ffi_check(dll.SetProducerNameServerAddress(self._handle, _to_bytes(addr))) + def set_message_trace(self, message_trace): + ffi_check(dll.SetProducerMessageTrace(self._handle, message_trace and TraceModel.OPEN or TraceModel.CLOSE)) + def start(self): ffi_check(dll.StartProducer(self._handle)) @@ -389,6 +408,13 @@ def set_session_credentials(self, access_key, access_secret, channel): _to_bytes(channel) )) + def set_ssl_enable(self, enable): + ssl_enable_code = 1 if enable else 0 + ffi_check(dll.SetPushConsumerSsl(self._handle, ssl_enable_code)) + + def set_ssl_property_file(self, file_path): + ffi_check(dll.SetPushConsumerSslPropertyFile(self._handle, _to_bytes(file_path))) + def subscribe(self, topic, callback, expression='*'): def _on_message(consumer, msg): exc = None @@ -431,3 +457,6 @@ def set_message_batch_max_size(self, max_size): def set_instance_name(self, name): ffi_check(dll.SetPushConsumerInstanceName(self._handle, _to_bytes(name))) + + def set_message_trace(self, message_trace): + ffi_check(dll.SetPushConsumerMessageTrace(self._handle, message_trace and TraceModel.OPEN or TraceModel.CLOSE)) diff --git a/rocketmq/ffi.py b/rocketmq/ffi.py index 21a01df..0eca2ca 100644 --- a/rocketmq/ffi.py +++ b/rocketmq/ffi.py @@ -81,6 +81,11 @@ class MessageModel(CtypesEnum): CLUSTERING = 1 +class TraceModel(CtypesEnum): + OPEN = 0 + CLOSE = 1 + + class _CSendResult(Structure): _fields_ = [ ('sendStatus', c_int), @@ -200,6 +205,15 @@ class _CMQException(Structure): dll.SetProducerCompressLevel.restype = _CStatus dll.SetProducerMaxMessageSize.argtypes = [c_void_p, c_int] dll.SetProducerMaxMessageSize.restype = _CStatus +dll.SetProducerMessageTrace.argtypes = [c_void_p, TraceModel] +dll.SetProducerMessageTrace.restype = _CStatus +try: + dll.SetProducerSsl.argtypes = [c_void_p, c_int] + dll.SetProducerSsl.restype = _CStatus + dll.SetProducerSslPropertyFile.argtypes = [c_void_p, c_char_p] + dll.SetProducerSslPropertyFile.restype = _CStatus +except AttributeError: + pass dll.SendMessageSync.argtypes = [c_void_p, c_void_p, POINTER(_CSendResult)] dll.SendMessageSync.restype = _CStatus dll.SendMessageOneway.argtypes = [c_void_p, c_void_p] @@ -261,6 +275,16 @@ class _CMQException(Structure): dll.SetPushConsumerLogLevel.restype = _CStatus dll.SetPushConsumerMessageModel.argtypes = [c_void_p, MessageModel] dll.SetPushConsumerMessageModel.restype = _CStatus +dll.SetPushConsumerLogLevel.restype = _CStatus +dll.SetPushConsumerMessageTrace.argtypes = [c_void_p, TraceModel] +dll.SetPushConsumerMessageTrace.restype = _CStatus +try: + dll.SetPushConsumerSsl.argtypes = [c_void_p, c_int] + dll.SetPushConsumerSsl.restype = _CStatus + dll.SetPushConsumerSslPropertyFile.argtypes = [c_void_p, c_char_p] + dll.SetPushConsumerSslPropertyFile.restype = _CStatus +except AttributeError: + pass # Misc dll.GetLatestErrorMessage.argtypes = [] diff --git a/samples/consumer.py b/samples/consumer.py index b95da79..bf36c4a 100644 --- a/samples/consumer.py +++ b/samples/consumer.py @@ -27,7 +27,9 @@ def callback(msg): def start_consume_message(): consumer = PushConsumer('consumer_group') consumer.set_name_server_address('127.0.0.1:9876') - consumer.subscribe('TopicTest', callback) + consumer.subscribe('BenchmarkTest', callback) + # consumer.set_ssl_enable(True) + # consumer.set_ssl_property_file("/etc/rocketmq/tls.properties") print ('start consume message') consumer.start() diff --git a/samples/producer.py b/samples/producer.py index fb90b7b..4869b77 100644 --- a/samples/producer.py +++ b/samples/producer.py @@ -19,11 +19,12 @@ from rocketmq.client import Producer, Message, TransactionMQProducer, TransactionStatus import time +import threading topic = 'TopicTest' gid = 'test' name_srv = '127.0.0.1:9876' - +MUTEX = threading.Lock() def create_message(): msg = Message(topic) @@ -46,6 +47,38 @@ def send_message_sync(count): producer.shutdown() +def send_message_multi_threaded(retry_time): + producer = Producer(gid) + producer.set_name_server_address(name_srv) + msg = create_message() + + global MUTEX + MUTEX.acquire() + try: + producer.start() + except Exception as e: + print('ProducerStartFailed:', e) + MUTEX.release() + return + + try: + for i in range(retry_time): + ret = producer.send_sync(msg) + if ret.status == 0: + print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) + break + else: + print('send message to MQ failed.') + if i == (retry_time - 1): + print('send message to MQ failed after retries.') + except Exception as e: + print('ProducerSendSyncFailed:', e) + finally: + producer.shutdown() + MUTEX.release() + return + + def send_orderly_with_sharding_key(count): producer = Producer(gid, True) producer.set_name_server_address(name_srv) @@ -82,6 +115,23 @@ def send_transaction_message(count): time.sleep(3600) +def send_message_with_ssl(count): + producer = Producer(gid) + producer.set_name_server_address(name_srv) + producer.set_ssl_enable(True) + producer.set_ssl_property_file("/etc/rocketmq/tls.properties") + producer.start() + for n in range(count): + msg = create_message() + producer.start() + for n in range(count): + msg = create_message() + ret = producer.send_sync(msg) + print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) + print ('send sync message done') + producer.shutdown() + + if __name__ == '__main__': send_message_sync(10) diff --git a/setup.py b/setup.py index 72952b2..7bdcf81 100755 --- a/setup.py +++ b/setup.py @@ -64,7 +64,7 @@ def finalize_options(self): setup( name='rocketmq-client-python', - version='0.5.0-rc3', + version='2.0.1rc1', author='apache.rocketmq', author_email='dev@rocketmq.apache.org', packages=find_packages(exclude=('tests', 'tests.*')), diff --git a/tests/test_producer.py b/tests/test_producer.py index ee71ac8..311bf71 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -16,6 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from concurrent.futures import ThreadPoolExecutor import time import threading import sys @@ -34,6 +35,16 @@ def test_producer_send_sync(producer): assert ret.status == SendStatus.OK +def test_producer_send_sync_multi_thread(producer): + executor = ThreadPoolExecutor(max_workers=5) + futures = [] + for _ in range(5): + futures.append(executor.submit(test_producer_send_sync, producer)) + + for future in futures: + _ret = future.result() + + def test_producer_send_oneway(producer): msg = Message('test') msg.set_keys('send_oneway')