Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #100]Feature support gRPC: Fix bugs #102

Open
wants to merge 1 commit into
base: feature_support_grpc_copy
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions v2/nacos/config/abstract/abstract_config_transport_client.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import hashlib
import sched
import time
from abc import ABCMeta, abstractmethod
from concurrent.futures import ThreadPoolExecutor

from v2.nacos.common.constants import Constants
from v2.nacos.common.utils import get_current_time_millis
from v2.nacos.config.filter_impl.config_response import ConfigResponse
from v2.nacos.config.impl.server_list_manager import ServerListManager
from v2.nacos.property_key_constants import PropertyKeyConstants
from v2.nacos.security.security_proxy import SecurityProxy
from v2.nacos.utils.param_utils import ParamUtils


class AbstractConfigTransportClient(metaclass=ABCMeta):
Expand Down Expand Up @@ -62,20 +59,6 @@ def _get_security_headers(self) -> dict:
return security_headers

def _get_common_header(self) -> dict:
# ts = str(get_current_time_millis())
# md = hashlib.md5()
# md.update((ts + ParamUtil.get_app_key()).encode(Constants.ENCODE))
# token = md.hexdigest()
#
# headers = {
# Constants.CLIENT_APPNAME_HEADER: ParamUtil.get_app_name(),
# Constants.CLIENT_REQUEST_TS_HEADER: ts,
# Constants.CLIENT_REQUEST_TOKEN_HEADER: token,
# AbstractConfigTransportClient.CONFIG_INFO_HEADER: AbstractConfigTransportClient.DEFAULT_CONFIG_INFO,
# Constants.CHARSET_KEY: self.encode
# }
#
# return headers
# todo
return {}

Expand Down
75 changes: 69 additions & 6 deletions v2/nacos/config/common/group_key.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from v2.nacos.exception.nacos_exception import NacosException


class GroupKey:
PLUS = '+'

Expand All @@ -11,20 +14,80 @@ class GroupKey:

@staticmethod
def get_key(data_id: str, group: str, datum_str: str) -> str:
pass
return GroupKey.__do_get_key(data_id, group, datum_str)

@staticmethod
def get_key_tenant(data_id: str, group: str, tenant: str) -> str:
pass
return GroupKey.__do_get_key(data_id, group, tenant)

@staticmethod
def parse_key(group_key: str) -> list:
pass
sb = ""
data_id = None
group = None
tenant = None

i = 0
while i < len(group_key):
c = group_key[i]
if GroupKey.PLUS == c:
if not data_id:
data_id = sb
sb = ""
elif not group:
group = sb
sb = ""
else:
raise NacosException("invalid group_key:"+group_key)
elif GroupKey.PERCENT == c:
i += 1
next_c = group_key[i]
i += 1
next_next_c = group_key[i]
if GroupKey.TWO == next_c and GroupKey.B == next_next_c:
sb += GroupKey.PLUS
elif GroupKey.TWO == next_c and GroupKey.FIVE == next_next_c:
sb += GroupKey.PERCENT
else:
raise NacosException("invalid group_key:"+group_key)
else:
sb += c

if not group:
group = sb
else:
tenant = sb

if not data_id:
raise NacosException("invalid data_id")
if not group:
raise NacosException("invalid group")
return [data_id, group, tenant]

@staticmethod
def url_encode(string: str) -> str:
pass
sb = ""
for c in string:
if GroupKey.PLUS == c:
sb += "%2B"
elif GroupKey.PERCENT == c:
sb += "%25"
else:
sb += c
return sb

@staticmethod
def __do_get_key(self, data_id: str, group: str, datum_str: str) -> str:
pass
def __do_get_key(data_id: str, group: str, datum_str: str) -> str:
if not data_id or not data_id.strip():
raise NacosException("invalid dataId")
if not group or not group.strip():
raise NacosException("invalid group")

sb = ""
sb += GroupKey.url_encode(data_id)
sb += GroupKey.PLUS
sb += GroupKey.url_encode(group)
if datum_str:
sb += GroupKey.PLUS
sb += GroupKey.url_encode(datum_str)
return sb
18 changes: 6 additions & 12 deletions v2/nacos/config/impl/cache_data.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import hashlib
from concurrent.futures import ThreadPoolExecutor
from typing import Optional

from v2.nacos.common.constants import Constants
from v2.nacos.common.utils import get_current_time_millis
from v2.nacos.config.abstract.abstract_config_change_listener import AbstractConfigChangeListener
from v2.nacos.config.abstract.abstract_shared_listener import AbstractSharedListener
from v2.nacos.config.filter_impl.config_response import ConfigResponse
from v2.nacos.config.ilistener import Listener
from v2.nacos.config.impl.config_change_event import ConfigChangeEvent
from v2.nacos.config.impl.config_change_handler import ConfigChangeHandler
from v2.nacos.config.impl.local_config_info_processor import LocalConfigInfoProcessor
from v2.nacos.config.impl.local_encrypted_data_key_processor import LocalEncryptedDataKeyProcessor
from v2.nacos.exception.nacos_exception import NacosException
Expand Down Expand Up @@ -38,7 +31,7 @@ def __init__(self, logger, config_filter_chain_manager, name, data_id, group, te
self.use_local_config = False
self.local_config_last_modified = None

self.encrypted_data_key = self.__load_encrypted_data_key_from_disk_local()
self.encrypted_data_key = self.__load_encrypted_data_key_from_disk_local(name, data_id, group, tenant)
self.last_modified_ts = 0
self.task_id = None
self.initializing = True
Expand Down Expand Up @@ -90,9 +83,10 @@ def add_listener(self, listener) -> None:
def remove_listener(self, listener) -> None:
if not listener:
raise NacosException("[ArgumentException]Listener is None")
wrap = CacheData.ManagerListenerWrap(listener=listener)
try:
self.listeners.remove(wrap)
for listener_wrap in self.listeners:
if listener_wrap.listener is listener:
self.listeners.remove(listener_wrap)
self.logger.info("[%s][remove-listener] ok, dataId=%s, group=%s, cnt=%s"
% (self.name, self.data_id, self.group, len(self.listeners)))
except NacosException as e:
Expand Down Expand Up @@ -149,7 +143,7 @@ def check_listener_md5_consistent(self) -> bool:

def __safe_notify_listener(
self, data_id, group, content, config_type, md5, encrypted_dat_key, listener_wrap) -> None:
# todo uncompleted
# todo
# def job():
# start = get_current_time_millis()
# try:
Expand Down Expand Up @@ -237,7 +231,7 @@ def get_md5_str(config: str) -> str:
return Constants.NULL
else:
md5 = hashlib.md5()
md5.update(config)
md5.update(config.encode("utf-8"))
return md5.hexdigest()

def __load_cache_content_from_disk_local(self, name: str, data_id: str, group: str, tenant: str) -> str:
Expand Down
78 changes: 43 additions & 35 deletions v2/nacos/config/impl/client_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, logger, config_filter_chain_manager, server_list_manager, pro
self.uuid = uuid.uuid4()
self.timeout = None
self.agent = ClientWorker.ConfigRpcTransportClient(
logger, properties, server_list_manager, self.cache_map, self
logger, properties, server_list_manager, self
)
self.task_penalty_time = None
self.enable_remote_sync_config = False
Expand All @@ -60,7 +60,7 @@ def __init__(self, logger, config_filter_chain_manager, server_list_manager, pro
self.agent.start()

def __init_properties(self, properties: dict) -> None:
# todo 尚未完善
# todo
self.enable_remote_sync_config = properties.get(PropertyKeyConstants.ENABLE_REMOTE_SYNC_CONFIG)
if not self.enable_remote_sync_config:
self.enable_remote_sync_config = False
Expand All @@ -86,6 +86,10 @@ def add_tenant_listeners(self, data_id: str, group: str, listeners: list) -> Non
group = self.__blank_2_default_group(group)
tenant = self.agent.get_tenant()
cache = self.add_cache_data_if_absent(data_id, group, tenant)

# debug
# print("at add_tenant_listener, cache_map:", str(self.cache_map))

with self.lock:
for listener in listeners:
cache.add_listener(listener)
Expand Down Expand Up @@ -127,7 +131,11 @@ def remove_tenant_listener(self, data_id: str, group: str, listener: Listener) -
def remove_cache(self, data_id: str, group: str, tenant: str) -> None:
group_key = GroupKey.get_key_tenant(data_id, group, tenant)
with self.lock:
self.cache_map.pop(group_key)
# debug
# print("remove", group_key, "from:", self.cache_map)
copy = self.cache_map.copy()
copy.pop(group_key)
self.cache_map = copy
self.logger.info("[%s] [unsubscribe] %s" % (self.agent.get_name(), group_key))

def remove_config(self, data_id: str, group: str, tenant: str, tag: str):
Expand All @@ -145,21 +153,28 @@ def add_cache_data_if_absent(self, data_id: str, group: str, tenant: str) -> Cac
return cache

key = GroupKey.get_key(data_id, group, tenant)
cache = CacheData(
self.logger, self.config_filter_chain_manager, self.agent.get_name(), data_id, group, "")

with self.lock:
cache_from_map = self.get_cache(data_id, group, tenant)
if cache_from_map:
cache = cache_from_map
cache.set_sync_with_server(True)
else:
cache = CacheData(
self.logger, self.config_filter_chain_manager, self.agent.get_name(), data_id, group, tenant
)
task_id = len(self.cache_map) / CacheData.PER_TASK_CONFIG_SIZE
cache.set_task_id(int(task_id))
if self.enable_remote_sync_config:
response = self.get_server_config(data_id, group, tenant, 3000, False)
cache.set_content(response.get_content())
self.cache_map[key] = cache

copy = self.cache_map.copy()
copy[key] = cache
self.cache_map = copy

# debug
# print("add cache:", str(self.cache_map))

self.logger.info("[%s] [subscribe] %s" % (self.agent.get_name(), key))

Expand All @@ -170,7 +185,8 @@ def get_cache(self, data_id: str, group: str, tenant: str) -> CacheData:
raise NacosException()
return self.cache_map.get(GroupKey.get_key_tenant(data_id, group, tenant))

def get_server_config(self, data_id: str, group: str, tenant: str, read_timeout: int, notify: bool) -> ConfigResponse:
def get_server_config(
self, data_id: str, group: str, tenant: str, read_timeout: int, notify: bool) -> ConfigResponse:
if not group:
group = Constants.DEFAULT_GROUP
return self.agent.query_config(data_id, group, tenant, read_timeout, notify)
Expand All @@ -196,14 +212,13 @@ class ConfigRpcTransportClient(AbstractConfigTransportClient):
RPC_AGENT_NAME = "config_rpc_client"

def __init__(
self, logger, properties: dict, server_list_manager: ServerListManager, cache_map: dict, client_worker
self, logger, properties: dict, server_list_manager: ServerListManager, client_worker
):
super().__init__(logger, properties, server_list_manager)
self.listen_execute_bell = queue.Queue()
self.bell_item = object()
self.last_all_sync_time = get_current_time_millis()
self.uuid = uuid.uuid4()
self.cache_map = cache_map
self.lock = RLock()
self.local_config_info_processor = LocalConfigInfoProcessor(logger)
self.local_encrypted_data_key_processor = LocalEncryptedDataKeyProcessor(logger)
Expand All @@ -216,7 +231,10 @@ def start_internal(self) -> None:
def __start_run(self) -> None:
while self.executor:
try:
self.listen_execute_bell.get(timeout=5)
try:
self.listen_execute_bell.get(timeout=5)
except Exception:
pass
self.execute_config_listen()
except NacosException as e:
self.logger.error("[rpc listen execute] [rpc listen] exception " + str(e))
Expand All @@ -234,7 +252,11 @@ def execute_config_listen(self) -> None:
need_all_sync = (now - self.last_all_sync_time) >= ClientWorker.ConfigRpcTransportClient.ALL_SYNC_INTERNAL

# categorize cache_data
for cache in self.cache_map.values():

# debug
# print("at execute_config_listen, cache_map:", self.client_worker.cache_map)

for cache in self.client_worker.cache_map.values():
with self.lock:
if cache.is_sync_with_server():
cache.check_listener_md5()
Expand Down Expand Up @@ -281,7 +303,7 @@ def execute_config_listen(self) -> None:
change_config.dataId, change_config.group, change_config.tenant
)
change_keys.append(change_key)
initializing = self.cache_map.get(change_key).is_initializing()
initializing = self.client_worker.cache_map.get(change_key).is_initializing()
self.__refresh_content_and_check(change_key, not initializing)

for cache_data in listen_caches:
Expand Down Expand Up @@ -324,8 +346,8 @@ def execute_config_listen(self) -> None:
self.notify_listen_config()

def __refresh_content_and_check(self, group_key: str, notify: bool) -> None:
if self.cache_map and group_key in self.cache_map.keys():
cache_data = self.cache_map.get(group_key)
if self.client_worker.cache_map and group_key in self.client_worker.cache_map.keys():
cache_data = self.client_worker.cache_map.get(group_key)
try:
response = self.get_server_config(cache_data.data_id, cache_data.group, cache_data.tenant, 3000,
notify)
Expand Down Expand Up @@ -402,7 +424,7 @@ def query_config(self, data_id: str, group: str, tenant: str, read_timeout: int,
rpc_client = self.get_one_running_client()

if notify:
cache_data = self.cache_map.get(GroupKey.get_key_tenant(data_id, group, tenant))
cache_data = self.client_worker.cache_map.get(GroupKey.get_key_tenant(data_id, group, tenant))
if cache_data:
rpc_client = self.__ensure_rpc_client(str(cache_data.get_task_id()))

Expand All @@ -425,7 +447,7 @@ def query_config(self, data_id: str, group: str, tenant: str, read_timeout: int,
return config_response
elif response.get_error_code() == ConfigQueryResponse.CONFIG_NOT_FOUND:
self.local_config_info_processor.save_snapshot(self.get_name(), data_id, group, tenant, None)
LocalEncryptedDataKeyProcessor.save_snapshot(self.get_name(), data_id, group, tenant, None)
self.local_encrypted_data_key_processor.save_snapshot(self.get_name(), data_id, group, tenant, None)
return config_response
elif response.get_error_code() == ConfigQueryResponse.CONFIG_QUERY_CONFLICT:
self.logger.error(
Expand Down Expand Up @@ -529,36 +551,22 @@ def shutdown(self):
self.logger.info("Shutdown executor " + str(self.executor))
self.executor.shutdown(wait=False)

for value in self.cache_map.values():
for value in self.client_worker.cache_map.values():
value.set_sync_with_server(False)

@staticmethod
def __get_labels() -> dict:
# labels = {
# RemoteConstants.LABEL_SOURCE: RemoteConstants.LABEL_SOURCE_SDK,
# RemoteConstants.LABEL_MODULE: RemoteConstants.LABEL_MODULE_CONFIG,
# Constants.APPNAME: AppNameUtils.get_app_name(),
# Constants.VIPSERVER_TAG: EnvUtil.get_self_vip_server_tag(),
# Constants.AMORY_TAG: EnvUtil.get_self_amory_tag(),
# Constants.LOCATION_TAG: EnvUtil.get_self_location_tag()
# }
# return labels
# todo related utils is uncompleted.
# todo
return {}

def __init_rpc_client_handler(self, rpc_client_inner: RpcClient) -> None:
rpc_client_inner.register_server_request_handler(ConfigChangeNotifyRequestHandler(
self.cache_map, self.notify_listen_config)
self.logger, self.client_worker.cache_map, self.notify_listen_config)
)

# todo register ClientConfigMetricRequestHandler Q: necessary ?
# rpc_client_inner.register_server_request_handler(
# ClientConfigMetricRequestHandler(self.get_metrics)
# )

rpc_client_inner.register_connection_listener(
ConfigRpcConnectionEventListener(self.logger, rpc_client_inner, self.cache_map,
self.listen_execute_bell)
ConfigRpcConnectionEventListener(self.logger, rpc_client_inner, self.client_worker.cache_map,
self.notify_listen_config)
)

rpc_client_inner.set_server_list_factory(
Expand Down
Loading