Skip to content

Commit

Permalink
Merge pull request ClickHouse#68900 from ClickHouse/add-ip6tables-to-…
Browse files Browse the repository at this point in the history
…ci-helper

ci: add IPv6 support to `NetworkManager`
  • Loading branch information
thevar1able authored Sep 6, 2024
2 parents 2f668a0 + a463b2b commit 0c9ed35
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 22 deletions.
4 changes: 3 additions & 1 deletion docker/test/integration/helper_container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

FROM alpine:3.18
RUN apk add --no-cache -U iproute2 \
&& for bin in iptables iptables-restore iptables-save; \
&& for bin in \
iptables iptables-restore iptables-save \
ip6tables ip6tables-restore ip6tables-save; \
do ln -sf xtables-nft-multi "/sbin/$bin"; \
done
2 changes: 2 additions & 0 deletions tests/integration/helpers/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2112,6 +2112,7 @@ def restart_instance_with_ip_change(self, node, new_ip):
self.base_cmd + ["up", "--force-recreate", "--no-deps", "-d", node.name]
)
node.ip_address = self.get_instance_ip(node.name)
node.ipv6_address = self.get_instance_global_ipv6(node.name)
node.client = Client(node.ip_address, command=self.client_bin_path)

logging.info("Restart node with ip change")
Expand Down Expand Up @@ -3182,6 +3183,7 @@ def logging_azurite_initialization(exception, retry_number, sleep_time):
for instance in self.instances.values():
instance.docker_client = self.docker_client
instance.ip_address = self.get_instance_ip(instance.name)
instance.ipv6_address = self.get_instance_global_ipv6(instance.name)

logging.debug(
f"Waiting for ClickHouse start in {instance.name}, ip: {instance.ip_address}..."
Expand Down
157 changes: 136 additions & 21 deletions tests/integration/helpers/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import logging
import docker
import ipaddress


class PartitionManager:
Expand All @@ -26,25 +27,76 @@ def drop_instance_zk_connections(self, instance, action="DROP"):
self._check_instance(instance)

self._add_rule(
{"source": instance.ip_address, "destination_port": 2181, "action": action}
{
"source": instance.ip_address,
"destination_port": 2181,
"action": action,
}
)
self._add_rule(
{"destination": instance.ip_address, "source_port": 2181, "action": action}
{
"destination": instance.ip_address,
"source_port": 2181,
"action": action,
}
)

if instance.ipv6_address:
self._add_rule(
{
"source": instance.ipv6_address,
"destination_port": 2181,
"action": action,
}
)
self._add_rule(
{
"destination": instance.ipv6_address,
"source_port": 2181,
"action": action,
}
)

def dump_rules(self):
return _NetworkManager.get().dump_rules()
v4 = _NetworkManager.get().dump_rules()
v6 = _NetworkManager.get().dump_v6_rules()

return v4 + v6

def restore_instance_zk_connections(self, instance, action="DROP"):
self._check_instance(instance)

self._delete_rule(
{"source": instance.ip_address, "destination_port": 2181, "action": action}
{
"source": instance.ip_address,
"destination_port": 2181,
"action": action,
}
)
self._delete_rule(
{"destination": instance.ip_address, "source_port": 2181, "action": action}
{
"destination": instance.ip_address,
"source_port": 2181,
"action": action,
}
)

if instance.ipv6_address:
self._delete_rule(
{
"source": instance.ipv6_address,
"destination_port": 2181,
"action": action,
}
)
self._delete_rule(
{
"destination": instance.ipv6_address,
"source_port": 2181,
"action": action,
}
)

def partition_instances(self, left, right, port=None, action="DROP"):
self._check_instance(left)
self._check_instance(right)
Expand All @@ -59,16 +111,34 @@ def create_rule(src, dst):
rule["destination_port"] = port
return rule

def create_rule_v6(src, dst):
rule = {
"source": src.ipv6_address,
"destination": dst.ipv6_address,
"action": action,
}
if port is not None:
rule["destination_port"] = port
return rule

self._add_rule(create_rule(left, right))
self._add_rule(create_rule(right, left))

if left.ipv6_address and right.ipv6_address:
self._add_rule(create_rule_v6(left, right))
self._add_rule(create_rule_v6(right, left))

def add_network_delay(self, instance, delay_ms):
self._add_tc_netem_delay(instance, delay_ms)

def heal_all(self):
while self._iptables_rules:
rule = self._iptables_rules.pop()
_NetworkManager.get().delete_iptables_rule(**rule)

if self._is_ipv6_rule(rule):
_NetworkManager.get().delete_ip6tables_rule(**rule)
else:
_NetworkManager.get().delete_iptables_rule(**rule)

while self._netem_delayed_instances:
instance = self._netem_delayed_instances.pop()
Expand All @@ -90,12 +160,27 @@ def _check_instance(instance):
if instance.ip_address is None:
raise Exception("Instance + " + instance.name + " is not launched!")

@staticmethod
def _is_ipv6_rule(rule):
if rule.get("source"):
return ipaddress.ip_address(rule["source"]).version == 6
if rule.get("destination"):
return ipaddress.ip_address(rule["destination"]).version == 6

return False

def _add_rule(self, rule):
_NetworkManager.get().add_iptables_rule(**rule)
if self._is_ipv6_rule(rule):
_NetworkManager.get().add_ip6tables_rule(**rule)
else:
_NetworkManager.get().add_iptables_rule(**rule)
self._iptables_rules.append(rule)

def _delete_rule(self, rule):
_NetworkManager.get().delete_iptables_rule(**rule)
if self._is_ipv6_rule(rule):
_NetworkManager.get().delete_ip6tables_rule(**rule)
else:
_NetworkManager.get().delete_iptables_rule(**rule)
self._iptables_rules.remove(rule)

def _add_tc_netem_delay(self, instance, delay_ms):
Expand Down Expand Up @@ -150,35 +235,65 @@ def get(cls, **kwargs):
cls._instance = cls(**kwargs)
return cls._instance

def setup_ip6tables_docker_user_chain(self):
_rules = subprocess.check_output(f"ip6tables-save", shell=True)
if "DOCKER-USER" in _rules.decode("utf-8"):
return

setup_cmds = [
["ip6tables", "--wait", "-N", "DOCKER-USER"],
["ip6tables", "--wait", "-I", "FORWARD", "-j", "DOCKER-USER"],
["ip6tables", "--wait", "-A", "DOCKER-USER", "-j", "RETURN"],
]
for cmd in setup_cmds:
self._exec_run(cmd, privileged=True)

def add_iptables_rule(self, **kwargs):
cmd = ["iptables", "--wait", "-I", "DOCKER-USER", "1"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)

def add_ip6tables_rule(self, **kwargs):
self.setup_ip6tables_docker_user_chain()

cmd = ["ip6tables", "--wait", "-I", "DOCKER-USER", "1"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)

def delete_iptables_rule(self, **kwargs):
cmd = ["iptables", "--wait", "-D", "DOCKER-USER"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)

def delete_ip6tables_rule(self, **kwargs):
cmd = ["ip6tables", "--wait", "-D", "DOCKER-USER"]
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)

def dump_rules(self):
cmd = ["iptables", "-L", "DOCKER-USER"]
return self._exec_run(cmd, privileged=True)

def dump_v6_rules(self):
cmd = ["ip6tables", "-L", "DOCKER-USER"]
return self._exec_run(cmd, privileged=True)

@staticmethod
def clean_all_user_iptables_rules():
for i in range(1000):
iptables_iter = i
# when rules will be empty, it will return error
res = subprocess.run("iptables --wait -D DOCKER-USER 1", shell=True)

if res.returncode != 0:
logging.info(
"All iptables rules cleared, "
+ str(iptables_iter)
+ " iterations, last error: "
+ str(res.stderr)
)
return
for iptables in ("iptables", "ip6tables"):
for i in range(1000):
iptables_iter = i
# when rules will be empty, it will return error
res = subprocess.run(f"{iptables} --wait -D DOCKER-USER 1", shell=True)

if res.returncode != 0:
logging.info(
f"All {iptables} rules cleared, "
+ str(iptables_iter)
+ " iterations, last error: "
+ str(res.stderr)
)
break

@staticmethod
def _iptables_cmd_suffix(
Expand Down

0 comments on commit 0c9ed35

Please sign in to comment.