diff --git a/.gitignore b/.gitignore index 001db00e4b..d55affe5bc 100644 --- a/.gitignore +++ b/.gitignore @@ -73,12 +73,14 @@ swssconfig/swssconfig swssconfig/swssplayer tlm_teamd/tlm_teamd teamsyncd/teamsyncd +dashoffloadmanager/dashoffloadmanager tests/tests tests/mock_tests/tests_response_publisher tests/mock_tests/tests_fpmsyncd tests/mock_tests/tests_intfmgrd tests/mock_tests/tests_teammgrd tests/mock_tests/tests_portsyncd +tests/mock_tests/tests_dashoffloadmanager # Test Files # diff --git a/Makefile.am b/Makefile.am index 757db0d8d6..00c2d8087a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,7 +1,7 @@ if GCOV_ENABLED -SUBDIRS = gcovpreload fpmsyncd neighsyncd portsyncd mclagsyncd natsyncd fdbsyncd orchagent swssconfig cfgmgr tests gearsyncd +SUBDIRS = gcovpreload fpmsyncd neighsyncd portsyncd mclagsyncd natsyncd fdbsyncd orchagent swssconfig cfgmgr tests gearsyncd dashoffloadmanager else -SUBDIRS = fpmsyncd neighsyncd portsyncd mclagsyncd natsyncd fdbsyncd orchagent swssconfig cfgmgr tests gearsyncd +SUBDIRS = fpmsyncd neighsyncd portsyncd mclagsyncd natsyncd fdbsyncd orchagent swssconfig cfgmgr tests gearsyncd dashoffloadmanager endif diff --git a/configure.ac b/configure.ac index 6edc02da91..024a87f40b 100644 --- a/configure.ac +++ b/configure.ac @@ -158,6 +158,7 @@ AC_CONFIG_FILES([ mclagsyncd/Makefile swssconfig/Makefile cfgmgr/Makefile + dashoffloadmanager/Makefile tests/Makefile orchagent/p4orch/tests/Makefile ]) diff --git a/dashoffloadmanager/Makefile.am b/dashoffloadmanager/Makefile.am new file mode 100644 index 0000000000..e2a2585036 --- /dev/null +++ b/dashoffloadmanager/Makefile.am @@ -0,0 +1,31 @@ +INCLUDES = -I $(top_srcdir)/lib -I $(top_srcdir) -I $(top_srcdir)/orchagent -I $(top_srcdir)/orchagent/flex_counter +CFLAGS_SAI = -I /usr/include/sai +DASH_LIBS = -lprotobuf -ldashapi +MISC_LIBS = -lzmq -lswsscommon -l:libboost_program_options.a +SAIMETA_LIBS = -lsaimeta -lsaimetadata + +COMMON_ORCH_SOURCE = $(top_srcdir)/orchagent/orch.cpp \ + $(top_srcdir)/orchagent/zmqorch.cpp \ + $(top_srcdir)/orchagent/request_parser.cpp \ + $(top_srcdir)/orchagent/response_publisher.cpp \ + $(top_srcdir)/lib/recorder.cpp + +bin_PROGRAMS = dashoffloadmanager + +if DEBUG +DBGFLAGS = -ggdb -DDEBUG +else +DBGFLAGS = -g +endif + +dashoffloadmanager_SOURCES = main.cpp dpuinfoprovider.cpp dashoffloadmanager.cpp dashoffloadpavalidation.cpp $(COMMON_ORCH_SOURCE) +dashoffloadmanager_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) $(CFLAGS_ASAN) +dashoffloadmanager_LDADD = $(LDFLAGS_ASAN) $(MISC_LIBS) $(SAIMETA_LIBS) $(DASH_LIBS) + +if GCOV_ENABLED +dashoffloadmanager_SOURCES += ../gcovpreload/gcovpreload.cpp +endif + +if ASAN_ENABLED +dashoffloadmanager_SOURCES += $(top_srcdir)/lib/asan.cpp +endif diff --git a/dashoffloadmanager/dashoffloadmanager.cpp b/dashoffloadmanager/dashoffloadmanager.cpp new file mode 100644 index 0000000000..09010f1161 --- /dev/null +++ b/dashoffloadmanager/dashoffloadmanager.cpp @@ -0,0 +1,177 @@ +#include "dashoffloadmanager.h" +#include "logger_utils.h" + +#include "dashoffloadpavalidation.h" + +#include + +using namespace std; +using namespace swss; + +#define SELECT_TIMEOUT 60000 + +DashOffloadManager::DashOffloadManager(const DpuInfo& dpuInfo, const std::string& zmq_server_addr, const std::string& zmq_proxy_endpoint): + m_dpuInfo(dpuInfo), + m_dpuLogKey(makeDpuLogKey(dpuInfo.dpuId)), + m_dpuStateKey("DPU" + to_string(dpuInfo.dpuId)) +{ + SWSS_LOG_ENTER(); + + DASH_MNG_LOG_NOTICE("Starting Dash Offload Manager (zmq proxy: %s -> %s)", zmq_server_addr.c_str(), zmq_proxy_endpoint.c_str()); + + m_zmqProxy = make_unique(zmq_server_addr); + + m_zmqProxy->enableProxyMode(zmq_proxy_endpoint); + + m_applDb = make_unique("APPL_DB", 0); + + m_dpuStateDb = make_unique("CHASSIS_STATE_DB", 0); +}; + +void DashOffloadManager::start() +{ + SWSS_LOG_ENTER(); + + m_thread = make_shared(&DashOffloadManager::offload_handle, this); +} + +void DashOffloadManager::join() +{ + m_thread->join(); +} + +OffloadState DashOffloadManager::getOffloadState() const +{ + return m_offloadState; +} + +void DashOffloadManager::enableDefaultOffload() +{ + SWSS_LOG_ENTER(); + + enablePaValidationOffload(); +} + +void DashOffloadManager::disableOffload() +{ + SWSS_LOG_ENTER(); + + for (auto &orch : m_offloadOrchList) + { + auto orch_selecatables = orch->getSelectables(); + for (auto &sel : orch_selecatables) + { + m_select.removeSelectable(sel); + } + } + + m_offloadOrchList.clear(); + m_offloadState.pa_validation = false; +} + +void DashOffloadManager::processDpuStateUpdate(bool dpu_down) +{ + SWSS_LOG_ENTER(); + + if (dpu_down == m_dpu_state_down) + { + return; + } + + if (dpu_down) + { + DASH_MNG_LOG_NOTICE("DPU is down - stopping the offload"); + disableOffload(); + } else { + enableDefaultOffload(); + } + + m_dpu_state_down = dpu_down; +} + +void DashOffloadManager::processDpuStateTableUpdate(swss::SubscriberStateTable *table) +{ + SWSS_LOG_ENTER(); + + std::deque entries; + table->pops(entries); + + for (auto &entry : entries) + { + auto key = kfvKey(entry); + + if (key == m_dpuStateKey) + { + for (auto i : kfvFieldsValues(entry)) + { + if (fvField(i) == "dpu_control_plane_state") + { + bool dpu_down = fvValue(i) == "down"; + return processDpuStateUpdate(dpu_down); + } + } + } + } +} + +void DashOffloadManager::enablePaValidationOffload() +{ + SWSS_LOG_ENTER(); + + if (m_offloadState.pa_validation) + { + return; + } + + Orch *orch = new DashPaVlidationOffloadOrch(m_dpuInfo, m_applDb.get(), m_zmqProxy.get()); + m_select.addSelectables(orch->getSelectables()); + + m_offloadOrchList.push_back(unique_ptr(orch)); + + m_offloadState.pa_validation = true; +} + +void DashOffloadManager::offload_handle() +{ + SWSS_LOG_ENTER(); + + auto dpuStateTable = make_unique(m_dpuStateDb.get(), CHASSIS_STATE_DPU_STATE_TABLE_NAME, TableConsumable::DEFAULT_POP_BATCH_SIZE, 0); + m_select.addSelectable(dpuStateTable.get()); + + while (!m_stop_thread) + { + Selectable *sel; + int ret; + + ret = m_select.select(&sel, SELECT_TIMEOUT); + if (ret == Select::ERROR) + { + DASH_MNG_LOG_ERROR("Error from select - %s", strerror(errno)); + continue; + } + + if (ret == Select::TIMEOUT) + { + for (auto &o : m_offloadOrchList) + { + o->doTask(); + } + + continue; + } + + if (sel == dpuStateTable.get()) + { + processDpuStateTableUpdate(dpuStateTable.get()); + } else + { + auto *c = (Executor *)sel; + c->execute(); + } + + for (auto &o : m_offloadOrchList) + { + o->doTask(); + } + } +} diff --git a/dashoffloadmanager/dashoffloadmanager.h b/dashoffloadmanager/dashoffloadmanager.h new file mode 100644 index 0000000000..a1b64180e0 --- /dev/null +++ b/dashoffloadmanager/dashoffloadmanager.h @@ -0,0 +1,48 @@ +#pragma once + +#include "dbconnector.h" +#include "zmqserver.h" +#include "select.h" +#include "subscriberstatetable.h" +#include "orch.h" +#include + +#include "dpuinfoprovider.h" + +struct OffloadState +{ + bool pa_validation = false; +}; + +class DashOffloadManager +{ +public: + DashOffloadManager(const DpuInfo& dpuInfo, const std::string& zmq_server_addr, const std::string& zmq_proxy_endpoint); + void start(); + void join(); + OffloadState getOffloadState() const; + +private: + DpuInfo m_dpuInfo; + std::string m_dpuLogKey; + std::string m_dpuStateKey; + bool m_dpu_state_down = true; + OffloadState m_offloadState; + + swss::Select m_select; + std::shared_ptr m_thread; + std::atomic m_stop_thread { false }; + std::unique_ptr m_zmqProxy; + std::unique_ptr m_applDb; + std::unique_ptr m_dpuStateDb; + std::vector> m_offloadOrchList; + + void offload_handle(); + void enableDefaultOffload(); + void disableOffload(); + void processDpuStateTableUpdate(swss::SubscriberStateTable *table); + void processOffloadTablePaValidation(swss::KeyOpFieldsValuesTuple &entry); + void processDpuStateUpdate(bool dpu_down); + + void enablePaValidationOffload(); +}; diff --git a/dashoffloadmanager/dashoffloadpavalidation.cpp b/dashoffloadmanager/dashoffloadpavalidation.cpp new file mode 100644 index 0000000000..0c560e2d90 --- /dev/null +++ b/dashoffloadmanager/dashoffloadpavalidation.cpp @@ -0,0 +1,338 @@ +#include "dashoffloadpavalidation.h" + +#include "logger_utils.h" +#include "dash/taskworker.h" +#include "dash/pbutils.h" + +#include +#include +#include + +#include + +#define ACL_OFFLOAD_TABLE_NAME_PREFIX "DASH_PA_VALIDATION_DPU" +#define ACL_OFFLOAD_TABLE_TYPE "DASH_PA_VALIDATION" +#define ACL_RULE_FORWARD_PRIO "10" +#define ACL_RULE_DROP_PRIO "1" + +using namespace swss; +using namespace std; + +std::once_flag DashPaVlidationOffloadOrch::table_type_create_once; + +size_t IpAddressHash::operator()(swss::IpAddress addr) const +{ + size_t seed = 0; + const auto &inner = addr.getIp(); + boost::hash_combine(seed, inner.family); + if (inner.family == AF_INET) + { + boost::hash_combine(seed, inner.ip_addr.ipv4_addr); + } + else if (inner.family == AF_INET6) + { + boost::hash_combine(seed, inner.ip_addr.ipv6_addr); + } + return seed; +} + +PaValidationEntry::PaValidationEntry(const std::string& key, const dash::pa_validation::PaValidation &pbEntry) + : vni(key) +{ + addresses.reserve(pbEntry.addresses().size()); + + for (const auto& addr : pbEntry.addresses()) + { + if (addr.has_ipv4()) + { + swss::ip_addr_t addrv4 = {.family = AF_INET, .ip_addr = {.ipv4_addr = addr.ipv4()}}; + addresses.push_back(IpAddress(addrv4)); + } + + if (addr.has_ipv6()) + { + swss::ip_addr_t addrv6 = {.family = AF_INET6, .ip_addr = {}}; + memcpy(addrv6.ip_addr.ipv6_addr, addr.ipv6().c_str(), sizeof(addrv6.ip_addr.ipv6_addr)); + addresses.push_back(IpAddress(addrv6)); + } + } +} + +DashPaVlidationOffloadOrch::DashPaVlidationOffloadOrch(const DpuInfo& dpuInfo, swss::DBConnector *applDb, ZmqServer *zmqServer) + : ZmqOrch(applDb, vector{APP_DASH_PA_VALIDATION_TABLE_NAME}, zmqServer), + m_dpuInfo(dpuInfo), + m_dpuLogKey(makeDpuLogKey(dpuInfo.dpuId)), + m_applAclTableTypeTable(applDb, APP_ACL_TABLE_TYPE_TABLE_NAME), + m_applAclTableTable(applDb, APP_ACL_TABLE_TABLE_NAME), + m_applAclRuleTable(applDb, APP_ACL_RULE_TABLE_NAME) +{ + SWSS_LOG_ENTER(); + + DASH_MNG_LOG_NOTICE("Started PA Validation offload"); + + initializeAclConfig(); +} + +DashPaVlidationOffloadOrch::~DashPaVlidationOffloadOrch() +{ + cleanup(); +} + +void DashPaVlidationOffloadOrch::createAclTableType() +{ + SWSS_LOG_ENTER(); + + const static vector config = { + {"MATCHES", "TUNNEL_VNI,SRC_IP,SRC_IPV6"}, + {"ACTIONS", "PACKET_ACTION"}, + {"BIND_POINTS", "PORT"} + }; + + DASH_MNG_LOG_NOTICE("Creating ACL Table type for PA Validation offload"); + + m_applAclTableTypeTable.set(ACL_OFFLOAD_TABLE_TYPE, config); +} + +void DashPaVlidationOffloadOrch::initializeAclConfig() +{ + call_once(DashPaVlidationOffloadOrch::table_type_create_once, &DashPaVlidationOffloadOrch::createAclTableType, this); +} + +void DashPaVlidationOffloadOrch::addOffloadAclTable() +{ + SWSS_LOG_ENTER(); + + if (!m_offloadAclTable.empty()) + { + return; + } + + m_offloadAclTable = ACL_OFFLOAD_TABLE_NAME_PREFIX + to_string(m_dpuInfo.dpuId); + + vector table_attrs = { + FieldValueTuple("policy_desc", "PA Validation offload table"), + FieldValueTuple("type", ACL_OFFLOAD_TABLE_TYPE), + FieldValueTuple("stage", "egress"), + FieldValueTuple("ports", m_dpuInfo.interfaces), + }; + + DASH_MNG_LOG_NOTICE("Creating ACL Table for PA Validation offload - %s (%s)", m_offloadAclTable.c_str(), m_dpuInfo.interfaces.c_str()); + + m_applAclTableTable.set(m_offloadAclTable, table_attrs); +} + +void DashPaVlidationOffloadOrch::cleanup() +{ + SWSS_LOG_ENTER(); + + if (m_offloadAclTable.empty()) + { + return; + } + + for (const auto& vni_info : m_vni_map) + { + const auto& vni = vni_info.first; + const auto& addresses = vni_info.second; + + removeOffloadAclConfig(vni, addresses.size()); + } + + DASH_MNG_LOG_NOTICE("Removing ACL Table for PA Validation offload - %s", m_offloadAclTable.c_str()); + + m_applAclTableTable.del(m_offloadAclTable); + m_offloadAclTable.clear(); +} + +bool DashPaVlidationOffloadOrch::validateEntryAdd(PaValidationEntry& entry, size_t& num_of_existing_addresses) const +{ + SWSS_LOG_ENTER(); + + auto vni_ips = m_vni_map.find(entry.vni); + if (vni_ips == m_vni_map.end()) + { + num_of_existing_addresses = 0; + return true; + } + + entry.addresses.erase(remove_if(entry.addresses.begin(), entry.addresses.end(), + [&vni_ips, &entry, this](const auto &addr) { + bool exists = vni_ips->second.find(addr) != vni_ips->second.end(); + if (exists) + { + DASH_MNG_LOG_WARN("Address %s is already added to VNI %s PA Validation offload", addr.to_string().c_str(), entry.vni.c_str()); + } + + return exists; + }), + entry.addresses.end() + ); + + num_of_existing_addresses = vni_ips->second.size(); + + return false; +} + +void DashPaVlidationOffloadOrch::entryAddToVniMap(const PaValidationEntry& entry) +{ + m_vni_map[entry.vni].insert(entry.addresses.begin(), entry.addresses.end()); +} + +std::string DashPaVlidationOffloadOrch::makeAclRuleKey(const std::string &vni, size_t rule_idx) const +{ + return m_offloadAclTable + ":RULE_VNI_" + vni + "_" + to_string(rule_idx); +} + +std::string DashPaVlidationOffloadOrch::makeAclDropRuleKey(const std::string &vni) const +{ + return m_offloadAclTable + ":RULE_VNI_" + vni + "_DROP"; +} + +void DashPaVlidationOffloadOrch::buildAclRules(const PaValidationEntry& entry, size_t rule_index_base, std::vector &forward_rules, AclRule &drop_rule) +{ + SWSS_LOG_ENTER(); + + forward_rules.reserve(entry.addresses.size() + 1); + + for (const auto &addr : entry.addresses) + { + auto ip_match_pype = addr.isV4() ? "SRC_IP" : "SRC_IPV6"; + auto mask = addr.isV4() ? "/32" : "/128"; + auto ip = addr.to_string(); + + vector rule_attrs = { + FieldValueTuple("priority", ACL_RULE_FORWARD_PRIO), + FieldValueTuple("PACKET_ACTION", "FORWARD"), + FieldValueTuple("TUNNEL_VNI", entry.vni), + FieldValueTuple(ip_match_pype, ip + mask) + }; + + auto rule_key = makeAclRuleKey(entry.vni, rule_index_base++); + + DASH_MNG_LOG_INFO("Creating ACL forward rule %s (vni:%s address:%s)", rule_key.c_str(), entry.vni.c_str(), ip.c_str()); + + forward_rules.push_back({rule_key, rule_attrs}); + } + + auto rule_key = makeAclDropRuleKey(entry.vni); + + vector rule_attrs = { + FieldValueTuple("priority", ACL_RULE_DROP_PRIO), + FieldValueTuple("PACKET_ACTION", "DROP"), + FieldValueTuple("TUNNEL_VNI", entry.vni) + }; + + DASH_MNG_LOG_INFO("Creating ACL drop rule %s (vni:%s)", rule_key.c_str(), entry.vni.c_str()); + + drop_rule = {rule_key, rule_attrs}; +} + +void DashPaVlidationOffloadOrch::addOffloadAclConfig(const PaValidationEntry& entry, bool new_vni, size_t rule_index_base) +{ + SWSS_LOG_ENTER(); + + addOffloadAclTable(); + + std::vector forward_rules; + AclRule drop_rule; + + buildAclRules(entry, rule_index_base, forward_rules, drop_rule); + + for (const auto& rule : forward_rules) + { + const auto& key = get<0>(rule); + const auto& attrs = get<1>(rule); + + m_applAclRuleTable.set(key, attrs); + } + + if (new_vni) + { + const auto& key = get<0>(drop_rule); + const auto& attrs = get<1>(drop_rule); + + m_applAclRuleTable.set(key, attrs); + } +} + +void DashPaVlidationOffloadOrch::removeOffloadAclConfig(const string& vni, size_t num_of_rules) +{ + SWSS_LOG_ENTER(); + + for (size_t rule_idx = 0; rule_idx < num_of_rules; rule_idx++) + { + auto rule_key = makeAclRuleKey(vni, rule_idx); + DASH_MNG_LOG_INFO("Removing ACL forward rule %s", rule_key.c_str()); + m_applAclRuleTable.del(rule_key); + } + + auto rule_key = makeAclDropRuleKey(vni); + DASH_MNG_LOG_INFO("Removing ACL drop rule %s", rule_key.c_str()); + m_applAclRuleTable.del(rule_key); +} + +void DashPaVlidationOffloadOrch::addPaValidationEntry(PaValidationEntry &entry) +{ + SWSS_LOG_ENTER(); + + size_t num_of_existing_addresses; + bool new_vni = validateEntryAdd(entry, num_of_existing_addresses); + + addOffloadAclConfig(entry, new_vni, num_of_existing_addresses); + + entryAddToVniMap(entry); +} + +void DashPaVlidationOffloadOrch::removePaValidationEntry(const string& paValidationKey) +{ + SWSS_LOG_ENTER(); + + auto vni = m_vni_map.find(paValidationKey); + if (vni == m_vni_map.end()) + { + DASH_MNG_LOG_WARN("VNI %s is removed or not created yet", paValidationKey.c_str()); + return; + } + + removeOffloadAclConfig(paValidationKey, vni->second.size()); + + m_vni_map.erase(vni); +} + +void DashPaVlidationOffloadOrch::doTask(ConsumerBase& consumer) +{ + SWSS_LOG_ENTER(); + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + string paValidationKey = kfvKey(t); + string op = kfvOp(t); + + if (op == SET_COMMAND) + { + dash::pa_validation::PaValidation pbEntry; + + if (!parsePbMessage(kfvFieldsValues(t), pbEntry)) + { + DASH_MNG_LOG_WARN("Failed to parse protobuff messaage for PA Validation: %s", paValidationKey.c_str()); + it = consumer.m_toSync.erase(it); + continue; + } + + auto entry = PaValidationEntry(paValidationKey, pbEntry); + addPaValidationEntry(entry); + } + else if (op == DEL_COMMAND) + { + removePaValidationEntry(paValidationKey); + } + else + { + DASH_MNG_LOG_ERROR("Unknown operation %s", op.c_str()); + } + + it = consumer.m_toSync.erase(it); + } +} diff --git a/dashoffloadmanager/dashoffloadpavalidation.h b/dashoffloadmanager/dashoffloadpavalidation.h new file mode 100644 index 0000000000..fad8b77522 --- /dev/null +++ b/dashoffloadmanager/dashoffloadpavalidation.h @@ -0,0 +1,65 @@ +#pragma once + +#include "dpuinfoprovider.h" +#include "zmqorch.h" +#include "zmqserver.h" +#include "producerstatetable.h" +#include "ipaddress.h" + +#include "dash_api/pa_validation.pb.h" + +#include +#include + +struct PaValidationEntry +{ + std::string vni; + std::vector addresses; + + PaValidationEntry(const std::string& vni, const dash::pa_validation::PaValidation &entry); +}; + +struct IpAddressHash +{ + std::size_t operator()(swss::IpAddress value) const; +}; + +class DashPaVlidationOffloadOrch : public ZmqOrch +{ +public: + DashPaVlidationOffloadOrch(const DpuInfo& dpuInfo, swss::DBConnector *applDb, swss::ZmqServer *zmqServer); + ~DashPaVlidationOffloadOrch(); + void doTask(ConsumerBase& consumer); + void cleanup(); + +private: + static std::once_flag table_type_create_once; + + DpuInfo m_dpuInfo; + std::string m_dpuLogKey; + + swss::ProducerStateTable m_applAclTableTypeTable; + swss::ProducerStateTable m_applAclTableTable; + swss::ProducerStateTable m_applAclRuleTable; + + std::string m_offloadAclTable; + std::unordered_map> m_vni_map; + + using AclRule = std::tuple>; + + void initializeAclConfig(); + void createAclTableType(); + void addOffloadAclTable(); + void buildAclRules(const PaValidationEntry& entry, size_t rule_index_base, std::vector &forward_rules, AclRule &drop_rule); + void addOffloadAclConfig(const PaValidationEntry& entry, bool new_vni, size_t rule_index_base); + void removeOffloadAclConfig(const std::string& vni, size_t num_of_rules); + + void addPaValidationEntry(PaValidationEntry &entry); + void removePaValidationEntry(const std::string& paValidationKey); + + bool validateEntryAdd(PaValidationEntry& entry, size_t& num_of_existing_addresses) const; + void entryAddToVniMap(const PaValidationEntry& entry); + + std::string makeAclRuleKey(const std::string &vni, size_t rule_index) const; + std::string makeAclDropRuleKey(const std::string &vni) const; +}; diff --git a/dashoffloadmanager/dpuinfoprovider.cpp b/dashoffloadmanager/dpuinfoprovider.cpp new file mode 100644 index 0000000000..1ab08647bf --- /dev/null +++ b/dashoffloadmanager/dpuinfoprovider.cpp @@ -0,0 +1,89 @@ +#include "dpuinfoprovider.h" + +#include "dbconnector.h" +#include "table.h" +#include "logger.h" +#include "tokenize.h" + +#include +#include + +using namespace std; +using namespace swss; +using json = nlohmann::json; + +bool getDpuInfo(vector &info) +{ + DBConnector db("CONFIG_DB", 0); + auto bridge_table = Table(&db, CFG_MID_PLANE_BRIDGE_TABLE_NAME); + auto dpus_table = Table(&db, CFG_DPUS_TABLE_NAME); + auto dhcp_table = Table(&db, CFG_DHCP_SERVER_IPV4_TABLE_NAME); + + string bridge; + bridge_table.hget("GLOBAL", "bridge", bridge); + if (bridge.empty()) + { + SWSS_LOG_ERROR("Failed to get brdige info from %s", CFG_MID_PLANE_BRIDGE_TABLE_NAME); + return false; + } + + vector dpus; + dpus_table.getKeys(dpus); + if (dpus.empty()) + { + SWSS_LOG_ERROR("Failed get DPU list from %s table", CFG_DPUS_TABLE_NAME); + return false; + } + + sort(dpus.begin(), dpus.end()); + + for (uint32_t dpuId = 0; dpuId < dpus.size(); dpuId++) + { + string dpuMidplane; + dpus_table.hget(dpus[dpuId], "midplane_interface", dpuMidplane); + if (dpuMidplane.empty()) + { + SWSS_LOG_ERROR("Failed get DPU midplane for %s", dpus[dpuId].c_str()); + return false; + } + + string dpuInterfacesJson; + dpus_table.hget(dpus[dpuId], "interface", dpuInterfacesJson); + if (dpuInterfacesJson.empty()) + { + SWSS_LOG_ERROR("Failed get DPU interface for %s", dpus[dpuId].c_str()); + return false; + } + + replace(dpuInterfacesJson.begin(), dpuInterfacesJson.end(), '\'', '"'); + + string dpuInterface; + auto parsed = json::parse(dpuInterfacesJson); + if (parsed.is_discarded()) + { + SWSS_LOG_ERROR("Failed to parse DPU interfaces from %s", dpuInterfacesJson.c_str()); + return false; + } + + vector interfaces; + std::transform(parsed.items().begin(), parsed.items().end(), std::back_inserter(interfaces), + [](const auto& j) { return j.key(); }); + + auto interfacesJoined = boost::algorithm::join(interfaces, ","); + + string dpuIpKey = bridge + "|" + dpuMidplane; + string dpuIps; + dhcp_table.hget(dpuIpKey, "ips@", dpuIps); + + auto dpuIpsList = tokenize(dpuIps, ','); + if (dpuIpsList.empty()) + { + SWSS_LOG_ERROR("Failed get DPU management IP for %s", dpus[dpuId].c_str()); + return false; + } + + info.push_back(DpuInfo{dpuId, dpuIpsList[0], interfacesJoined}); + } + + return true; +} diff --git a/dashoffloadmanager/dpuinfoprovider.h b/dashoffloadmanager/dpuinfoprovider.h new file mode 100644 index 0000000000..54a2a6a222 --- /dev/null +++ b/dashoffloadmanager/dpuinfoprovider.h @@ -0,0 +1,13 @@ +#pragma once + +#include +#include + +struct DpuInfo +{ + uint32_t dpuId; + std::string mgmtAddr; + std::string interfaces; +}; + +bool getDpuInfo(std::vector &info); diff --git a/dashoffloadmanager/logger_utils.h b/dashoffloadmanager/logger_utils.h new file mode 100644 index 0000000000..45b5027a20 --- /dev/null +++ b/dashoffloadmanager/logger_utils.h @@ -0,0 +1,15 @@ +#pragma once + +#include "logger.h" +#include + +#define DASH_MNG_LOG_ERROR(MSG, ...) SWSS_LOG_ERROR("[%s] - " MSG, m_dpuLogKey.c_str(), ##__VA_ARGS__) +#define DASH_MNG_LOG_WARN(MSG, ...) SWSS_LOG_WARN("[%s] - " MSG, m_dpuLogKey.c_str(), ##__VA_ARGS__) +#define DASH_MNG_LOG_NOTICE(MSG, ...) SWSS_LOG_NOTICE("[%s] - " MSG, m_dpuLogKey.c_str(), ##__VA_ARGS__) +#define DASH_MNG_LOG_INFO(MSG, ...) SWSS_LOG_INFO("[%s] - " MSG, m_dpuLogKey.c_str(), ##__VA_ARGS__) +#define DASH_MNG_LOG_DEBUG(MSG, ...) SWSS_LOG_DEBUG("[%s] - " MSG, m_dpuLogKey.c_str(), ##__VA_ARGS__) + +inline std::string makeDpuLogKey(uint32_t dpuId) +{ + return "DPU" + std::to_string(dpuId); +} diff --git a/dashoffloadmanager/main.cpp b/dashoffloadmanager/main.cpp new file mode 100644 index 0000000000..6a3e9b86ce --- /dev/null +++ b/dashoffloadmanager/main.cpp @@ -0,0 +1,108 @@ +#include +#include +#include + +#include "dbconnector.h" +#include "zmqserver.h" +#include "select.h" + +#include "dashoffloadmanager.h" +#include "dpuinfoprovider.h" + +#define DEFAULT_ZMQ_PORT 8100 + +using namespace swss; +using namespace std; +namespace po = boost::program_options; + +struct Args +{ + string zmq_server_base_addr; + uint16_t zmq_port; +}; + +bool parse_args(int argc, char **argv, Args &args) +{ + po::options_description desc("Usage"); + desc.add_options() + ("help,h", "Show help") + ("zmq_server_base_addr", po::value(&args.zmq_server_base_addr)->required(), "ZMQ Proxy server base address") + ("zmq_port", po::value(&args.zmq_port)->default_value(DEFAULT_ZMQ_PORT), "ZMQ port") + ; + + try + { + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + + if (vm.count("help")) + { + cout << desc << "\n"; + return 0; + } + + po::notify(vm); + } + catch(po::error& e) + { + cerr << "Error: " << e.what() << "\n\n"; + cerr << desc << "\n"; + return false; + } + + return true; +} + +string getIpByindex(const string& base, uint32_t index) +{ + struct in_addr addr; + inet_pton(AF_INET, base.c_str(), &addr); + + uint32_t new_ip = ntohl(addr.s_addr) + index; + addr.s_addr = htonl(new_ip); + + char ip_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &addr, ip_str, INET_ADDRSTRLEN); + + return ip_str; +} + +int main(int argc, char **argv) +{ + SonicDBConfig::initializeGlobalConfig(SonicDBConfig::DEFAULT_SONIC_DB_GLOBAL_CONFIG_FILE); + Logger::linkToDbNative("dashoffloadmanager"); + + Args args; + if (!parse_args(argc, argv, args)) + { + return EXIT_FAILURE; + } + + vector dpus; + auto ok = getDpuInfo(dpus); + if (!ok) + { + SWSS_LOG_ERROR("Failed to read dpu info"); + return EXIT_FAILURE; + } + + std::vector> dpu_offload_managers; + + for (const auto& dpu : dpus) + { + string zmqServerAddr = "tcp://" + getIpByindex(args.zmq_server_base_addr, dpu.dpuId) + ":" + to_string(args.zmq_port); + string zmqDpuAddr = "tcp://" + dpu.mgmtAddr + ":" + to_string(args.zmq_port); + + auto om = make_unique(dpu, zmqServerAddr, zmqDpuAddr); + om->start(); + + dpu_offload_managers.push_back(move(om)); + } + + for (auto& manager: dpu_offload_managers) + { + manager->join(); + } + + return 0; +} diff --git a/tests/mock_tests/Makefile.am b/tests/mock_tests/Makefile.am index 126f4e88c5..c83e19820c 100644 --- a/tests/mock_tests/Makefile.am +++ b/tests/mock_tests/Makefile.am @@ -6,9 +6,9 @@ DASH_PROTO_DIR = $(top_srcdir)/orchagent/dash/proto CFLAGS_SAI = -I /usr/include/sai -TESTS = tests tests_intfmgrd tests_teammgrd tests_portsyncd tests_fpmsyncd tests_response_publisher +TESTS = tests tests_intfmgrd tests_teammgrd tests_portsyncd tests_fpmsyncd tests_response_publisher tests_dashoffloadmanager -noinst_PROGRAMS = tests tests_intfmgrd tests_teammgrd tests_portsyncd tests_fpmsyncd tests_response_publisher +noinst_PROGRAMS = tests tests_intfmgrd tests_teammgrd tests_portsyncd tests_fpmsyncd tests_response_publisher tests_dashoffloadmanager LDADD_SAI = -lsaimeta -lsaimetadata -lsaivs -lsairedis @@ -268,3 +268,27 @@ tests_response_publisher_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CF tests_response_publisher_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_response_publisher_INCLUDES) tests_response_publisher_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \ -lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread + +## dashoffloadmanager unit tests + +tests_dashoffloadmanager_SOURCES = dashoffloadmanager/dashoffloadmanager_ut.cpp \ + $(top_srcdir)/dashoffloadmanager/dpuinfoprovider.cpp \ + $(top_srcdir)/dashoffloadmanager/dashoffloadmanager.cpp \ + $(top_srcdir)/dashoffloadmanager/dashoffloadpavalidation.cpp \ + $(top_srcdir)/orchagent/orch.cpp \ + $(top_srcdir)/orchagent/request_parser.cpp \ + $(top_srcdir)/lib/recorder.cpp \ + mock_dbconnector.cpp \ + mock_table.cpp \ + mock_hiredis.cpp \ + mock_redisreply.cpp \ + mock_subscriberstatetable.cpp \ + fake_response_publisher.cpp \ + fake_zmq_orch.cpp \ + fake_zmq_server.cpp + +tests_dashoffloadmanager_INCLUDES = $(tests_INCLUDES) -I$(top_srcdir)/dashoffloadmanager +tests_dashoffloadmanager_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) +tests_dashoffloadmanager_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_dashoffloadmanager_INCLUDES) +tests_dashoffloadmanager_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \ + -lswsscommon -lprotobuf -ldashapi -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread -lgmock -lgmock_main diff --git a/tests/mock_tests/dashoffloadmanager/dashoffloadmanager_ut.cpp b/tests/mock_tests/dashoffloadmanager/dashoffloadmanager_ut.cpp new file mode 100644 index 0000000000..5cfc2758c2 --- /dev/null +++ b/tests/mock_tests/dashoffloadmanager/dashoffloadmanager_ut.cpp @@ -0,0 +1,232 @@ +#include "gtest/gtest.h" +#include "mock_table.h" + +#include "dpuinfoprovider.h" + +#include + +#define private public +#include "dashoffloadmanager.h" +#include "dashoffloadpavalidation.h" +#undef private + +#define ACL_OFFLOAD_TABLE_TYPE "DASH_PA_VALIDATION" + +using namespace std; +using namespace swss; + +namespace dashoffloadmanager_test { + struct DashOffloadManagerTest : public ::testing::Test + { + using VniAddresses = vector; + using PaOffloadAclConfig = unordered_map; + + virtual void SetUp() override + { + ::testing_db::reset(); + + DBConnector db("CONFIG_DB", 0); + auto bridge_table = Table(&db, CFG_MID_PLANE_BRIDGE_TABLE_NAME); + auto dpus_table = Table(&db, CFG_DPUS_TABLE_NAME); + auto dhcp_table = Table(&db, CFG_DHCP_SERVER_IPV4_TABLE_NAME); + + const string bridge = "test-bridge"; + bridge_table.hset("GLOBAL", "bridge", bridge); + + const vector dpus = {"dpu0", "dpu1"}; + for (const auto& dpu : dpus) { + const string dpu_midplane = dpu + "midplane"; + const string dpu_interface = "{\"" + dpu + "interface\" : \"Ethenet0\"}"; + const string dpu_key = bridge + "|" + dpu_midplane; + const string dpu_ips = "1.2.3.4"; + + dpus_table.hset(dpu, "midplane_interface", dpu + "midplane"); + dpus_table.hset(dpu, "interface", dpu_interface); + dhcp_table.hset(dpu_key, "ips@", dpu_ips); + } + } + + void validateTableValue(Table *table, const string& key, const string& field, const string& exp) + { + string value; + ASSERT_TRUE(table->hget(key, field, value)) << "Get for " << key << " field:" << field; + ASSERT_EQ(value, exp); + } + + void addPaValidationEntry(DashPaVlidationOffloadOrch *orch, const string& vni, const vector &ips) + { + PaValidationEntry entry { vni, {}}; + for (const auto& addr : ips) { + entry.addresses.push_back(IpAddress(addr)); + } + + orch->addPaValidationEntry(entry); + } + + void validatePaAclConfig(DashPaVlidationOffloadOrch *orch, const DpuInfo &info, const PaOffloadAclConfig& config) + { + DBConnector db("APPL_DB", 0); + auto acl_tables = Table(&db, APP_ACL_TABLE_TABLE_NAME); + auto acl_rules = Table(&db, APP_ACL_RULE_TABLE_NAME); + auto table_name = orch->m_offloadAclTable; + + ASSERT_FALSE(table_name.empty()); + + validateTableValue(&acl_tables, table_name, "type", ACL_OFFLOAD_TABLE_TYPE); + validateTableValue(&acl_tables, table_name, "stage", "egress"); + validateTableValue(&acl_tables, table_name, "ports", info.interfaces); + + size_t rules_count = 0; + + for (const auto& v : config) + { + auto vni = v.first; + auto addresses = v.second; + uint32_t rule_idx = 0; + + for (const auto& addr : addresses) + { + auto rule_key = orch->makeAclRuleKey(vni, rule_idx++); + auto ip_match = (addr.find("/32") != string::npos) ? "SRC_IP" : "SRC_IPV6"; + + validateTableValue(&acl_rules, rule_key, "priority", "10"); + validateTableValue(&acl_rules, rule_key, "PACKET_ACTION", "FORWARD"); + validateTableValue(&acl_rules, rule_key, "TUNNEL_VNI", vni); + validateTableValue(&acl_rules, rule_key, ip_match, addr); + } + + auto drop_rule_key = orch->makeAclDropRuleKey(vni); + + validateTableValue(&acl_rules, drop_rule_key, "priority", "1"); + validateTableValue(&acl_rules, drop_rule_key, "PACKET_ACTION", "DROP"); + validateTableValue(&acl_rules, drop_rule_key, "TUNNEL_VNI", vni); + + rules_count += addresses.size() + 1; + } + + vector all_rules; + acl_rules.getKeys(all_rules); + ASSERT_EQ(all_rules.size(), rules_count); + } + + void validatePaAclConfigEmpty() + { + DBConnector db("APPL_DB", 0); + auto acl_tables = Table(&db, APP_ACL_TABLE_TABLE_NAME); + auto acl_rules = Table(&db, APP_ACL_RULE_TABLE_NAME); + + vector tables; + acl_tables.getKeys(tables); + ASSERT_TRUE(tables.empty()); + + vector rules; + acl_rules.getKeys(rules); + ASSERT_TRUE(rules.empty()); + } + }; + + TEST_F(DashOffloadManagerTest, DPUInfo) { + vector info; + ASSERT_TRUE(getDpuInfo(info)); + ASSERT_EQ(info.size(), 2); + + for (size_t i = 0; i < info.size(); i++) { + const auto& dpu = info[i]; + ASSERT_EQ(dpu.dpuId, i); + ASSERT_EQ(dpu.mgmtAddr, "1.2.3.4"); + ASSERT_EQ(dpu.interfaces, "dpu" + to_string(i) + "interface"); + } + } + + TEST_F(DashOffloadManagerTest, OffloadState) { + vector info; + ASSERT_TRUE(getDpuInfo(info)); + + DashOffloadManager om(info[0], "", ""); + // DPU is down + ASSERT_FALSE(om.getOffloadState().pa_validation); + ASSERT_TRUE(om.m_offloadOrchList.empty()); + + // DPU is up + DBConnector db("CHASSIS_STATE_DB", 0); + auto dpu_state = Table(&db, CHASSIS_STATE_DPU_STATE_TABLE_NAME); + dpu_state.hset("DPU0", "dpu_control_plane_state", "up"); + + SubscriberStateTable dpu_sub(&db, CHASSIS_STATE_DPU_STATE_TABLE_NAME, TableConsumable::DEFAULT_POP_BATCH_SIZE, 0); + om.processDpuStateTableUpdate(&dpu_sub); + ASSERT_TRUE(om.getOffloadState().pa_validation); + ASSERT_EQ(om.m_offloadOrchList.size(), 1); + + // check double event + dpu_state.hset("DPU0", "dpu_control_plane_state", "up"); + om.processDpuStateTableUpdate(&dpu_sub); + ASSERT_TRUE(om.getOffloadState().pa_validation); + ASSERT_EQ(om.m_offloadOrchList.size(), 1); + + // DPU is down + dpu_state.hset("DPU0", "dpu_control_plane_state", "down"); + om.processDpuStateTableUpdate(&dpu_sub); + ASSERT_FALSE(om.getOffloadState().pa_validation); + ASSERT_TRUE(om.m_offloadOrchList.empty()); + } + + void validateAclTableType(const DpuInfo &info) + { + DBConnector db("APPL_DB", 0); + auto acl_table_type = Table(&db, APP_ACL_TABLE_TYPE_TABLE_NAME); + + string value; + ASSERT_TRUE(acl_table_type.hget(ACL_OFFLOAD_TABLE_TYPE, "MATCHES", value)); + ASSERT_EQ(value, "TUNNEL_VNI,SRC_IP,SRC_IPV6"); + + ASSERT_TRUE(acl_table_type.hget(ACL_OFFLOAD_TABLE_TYPE, "ACTIONS", value)); + ASSERT_EQ(value, "PACKET_ACTION"); + + ASSERT_TRUE(acl_table_type.hget(ACL_OFFLOAD_TABLE_TYPE, "BIND_POINTS", value)); + ASSERT_EQ(value, "PORT"); + } + + TEST_F(DashOffloadManagerTest, PaValidation) + { + DBConnector db("APPL_DB", 0); + + vector info; + ASSERT_TRUE(getDpuInfo(info)); + + DashPaVlidationOffloadOrch pa_orch(info[0], &db, nullptr); + + pa_orch.createAclTableType(); + validateAclTableType(info[0]); + + // add rules + addPaValidationEntry(&pa_orch, "1000", {"10.1.1.1", "10.2.2.2", "ff01::10"}); + addPaValidationEntry(&pa_orch, "2000", {"20.1.1.1", "20.2.2.2", "ff01::20"}); + + auto acl_config = PaOffloadAclConfig { + {"1000", {"10.1.1.1/32", "10.2.2.2/32", "ff01::10/128"}}, + {"2000", {"20.1.1.1/32", "20.2.2.2/32", "ff01::20/128"}} + }; + validatePaAclConfig(&pa_orch, info[0], acl_config); + + // add more rules + addPaValidationEntry(&pa_orch, "1000", {"10.2.2.2", "10.3.3.3"}); // 10.2.2.2 already added + addPaValidationEntry(&pa_orch, "2000", {"ff01::20", "ff02::20"}); // ff01::20 already added + + acl_config = { + {"1000", {"10.1.1.1/32", "10.2.2.2/32", "ff01::10/128", "10.3.3.3/32"}}, + {"2000", {"20.1.1.1/32", "20.2.2.2/32", "ff01::20/128", "ff02::20/128"}} + }; + validatePaAclConfig(&pa_orch, info[0], acl_config); + + // remove vni 1000 + pa_orch.removePaValidationEntry("1000"); + + acl_config = { + {"2000", {"20.1.1.1/32", "20.2.2.2/32", "ff01::20/128", "ff02::20/128"}} + }; + validatePaAclConfig(&pa_orch, info[0], acl_config); + + pa_orch.cleanup(); + validatePaAclConfigEmpty(); + } +} diff --git a/tests/mock_tests/database_config.json b/tests/mock_tests/database_config.json index baf705ea23..de0a999acf 100644 --- a/tests/mock_tests/database_config.json +++ b/tests/mock_tests/database_config.json @@ -72,6 +72,11 @@ "separator": "|", "instance" : "redis_chassis" }, + "CHASSIS_STATE_DB" : { + "id" : 13, + "separator": "|", + "instance" : "redis_chassis" + }, "APPL_STATE_DB" : { "id" : 14, "separator": ":", diff --git a/tests/mock_tests/fake_zmq_orch.cpp b/tests/mock_tests/fake_zmq_orch.cpp new file mode 100644 index 0000000000..29a6c7aecf --- /dev/null +++ b/tests/mock_tests/fake_zmq_orch.cpp @@ -0,0 +1,10 @@ +#include "zmqorch.h" +#include "dbconnector.h" +#include "zmqserver.h" + +#include +#include + + +ZmqOrch::ZmqOrch(swss::DBConnector *db, const std::vector &tableNames, swss::ZmqServer *zmqServer) {}; +void ZmqOrch::doTask(Consumer &consumer) { }; diff --git a/tests/mock_tests/fake_zmq_server.cpp b/tests/mock_tests/fake_zmq_server.cpp new file mode 100644 index 0000000000..32a5132d95 --- /dev/null +++ b/tests/mock_tests/fake_zmq_server.cpp @@ -0,0 +1,10 @@ +#include "zmqserver.h" + +#include + +namespace swss +{ +ZmqServer::ZmqServer(const std::string& endpoint) {}; +ZmqServer::~ZmqServer() {}; +void ZmqServer::enableProxyMode(const std::string& proxy_endpoint) {}; +} diff --git a/tests/mock_tests/mock_dbconnector.cpp b/tests/mock_tests/mock_dbconnector.cpp index 7cabdc2224..f409aab909 100644 --- a/tests/mock_tests/mock_dbconnector.cpp +++ b/tests/mock_tests/mock_dbconnector.cpp @@ -57,6 +57,10 @@ namespace swss } } + DBConnector::DBConnector(const std::string& dbName, unsigned int timeout, bool isTcpConn, const SonicDBKey &key) + : DBConnector(dbName, timeout, isTcpConn) + {} + int DBConnector::getDbId() const { return m_dbId; diff --git a/tests/mock_tests/mock_table.cpp b/tests/mock_tests/mock_table.cpp index 7dc77d42ad..b770755807 100644 --- a/tests/mock_tests/mock_table.cpp +++ b/tests/mock_tests/mock_table.cpp @@ -99,6 +99,16 @@ namespace swss } } + void Table::hset(const std::string &key, + const std::string &field, + const std::string &value, + const std::string &op, + const std::string &prefix) + { + FieldValueTuple fv(field, value); + return set(key, std::vector{fv}, op, prefix); + } + void Table::getKeys(std::vector &keys) { keys.clear();