Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support of common security cipher module for encryption and decryption of a passkey #17201

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions src/sonic-py-common/sonic_py_common/security_cipher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
'''

A common module for handling the encryption and
decryption of the feature passkey. It also takes
care of storing the secure cipher at root
protected file system

'''

import subprocess
import threading
import syslog
import os
from swsscommon.swsscommon import ConfigDBConnector

class master_key_mgr:
_instance = None
_lock = threading.Lock()
_initialized = False

def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super(master_key_mgr, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance

def __init__(self):
if not self._initialized:
self._file_path = "/etc/cipher_pass"
self._config_db = ConfigDBConnector()
self._config_db.connect()
# Note: Kept 1st index NA intentionally to map it with the cipher_pass file
# contents. The file has a comment at the 1st row / line
self._feature_list = ["NA", "TACPLUS", "RADIUS", "LDAP"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a master key per feature? I thought we're using one master key for all features.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is better to have different Master keys for different feature. This way, there will not be any inter-dependency between them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not complicate things for the user, just follow well established practices (e.g a master key for all features) from the popular NOS if possible, my 2 cents.

Copy link
Contributor Author

@nmoray nmoray Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this approach, we can further extend this implementation for other modules too not just TACPLUS, RADIUS and LDAP. Additionally, it is upto the user if he / she needs to use different keys or can use same keys for all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will give flexibility to the user, in my opinion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with single key, it will be having huge dependancy in case of changing that key. User needs to change the encrypted passkey in CONFIG_DB for all the features.

Copy link
Collaborator

@venkatmahalingam venkatmahalingam Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree, IMO, we should just look at the behavior of other popular NOS that's been there for many years, if required for any SONiC use-case, we can think about providing the flexibility in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this approach, we are already providing the flexibility to the user (why to wait for the future :-)). They can have either same or different keys for different features.

The proprietary NOSes have different architectures and they have implemented the feature which can be fitted into their infrastructure. :-) I guess, it is always better if we can add bit of flexibility into any of the designs.

if not os.path.exists(self._file_path):
with open(self._file_path, 'w') as file:
file.writelines("#Auto generated file for storing the encryption passwords\n")
file.writelines("TACPLUS : \nRADIUS : \nLDAP :\n")
os.chmod(self._file_path, 0o640)
self._initialized = True

# Write cipher_pass file
def __write_passwd_file(self, feature_type, passwd):
if feature_type == 'NA':
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Invalid feature type: {}".format(feature_type))
return

if feature_type in self._feature_list:
try:
with open(self._file_path, 'r') as file:
lines = file.readlines()
# Update the password for given feature
lines[self._feature_list.index(feature_type)] = feature_type + ' : ' + passwd + '\n'

os.chmod(self._file_path, 0o777)
with open(self._file_path, 'w') as file:
file.writelines(lines)
nmoray marked this conversation as resolved.
Show resolved Hide resolved
os.chmod(self._file_path, 0o640)
except FileNotFoundError:
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: File {} no found".format(self._file_path))
except PermissionError:
syslog.syslog(syslog.LOG_ERR, "__write_passwd_file: Read permission denied: {}".format(self._file_path))

# Read cipher pass file and return the feature specifc
nmoray marked this conversation as resolved.
Show resolved Hide resolved
# password
def __read_passwd_file(self, feature_type):
passwd = None
if feature_type == 'NA':
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Invalid feature type: {}".format(feature_type))
return passwd

if feature_type in self._feature_list:
try:
os.chmod(self._file_path, 0o644)
with open(self._file_path, "r") as file:
lines = file.readlines()
for line in lines:
if feature_type in line:
passwd = line.split(' : ')[1]
os.chmod(self._file_path, 0o640)
except FileNotFoundError:
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: File {} no found".format(self._file_path))
except PermissionError:
syslog.syslog(syslog.LOG_ERR, "__read_passwd_file: Read permission denied: {}".format(self._file_path))

return passwd

# Encrypt the passkey
def encrypt_passkey(self, feature_type, secret, passwd):
cmd = [ 'openssl', 'enc', '-aes-128-cbc', '-A', '-a', '-salt', '-pbkdf2', '-pass', 'pass:' + passwd ]
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
outsecret, errs = p.communicate(input=secret)
if not errs:
self.__write_passwd_file(feature_type, passwd)

return outsecret,errs

# Decrypt the passkey
def decrypt_passkey(self, feature_type, secret):
errs = "Passkey Decryption failed"
passwd = self.__read_passwd_file(feature_type)
if passwd is not None:
cmd = "echo " + format(secret) + " | openssl enc -aes-128-cbc -a -d -salt -pbkdf2 -pass pass:" + passwd
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
output, errs = proc.communicate()

if not errs:
output = output.decode('utf-8')

return output, errs

# Check if the encryption is enabled
def is_key_encrypt_enabled(self, table, entry):
key = 'key_encrypt'
data = self._config_db.get_entry(table, entry)
if data:
if key in data:
return data[key]
return False

def del_cipher_pass(self):
try:
# Check if the file exists
if os.path.exists(self._file_path):
# Attempt to delete the file
os.remove(self._file_path)
syslog.syslog(syslog.LOG_INFO, "del_cipher_pass: {} file has been removed".format((self._file_path)))
else:
syslog.syslog(syslog.LOG_INFO, "del_cipher_pass: {} file doesn't exist".format((self._file_path)))
except Exception as e:
syslog.syslog(syslog.LOG_ERR, "del_cipher_pass: {} Exception occurred: {}".format((e)))

12 changes: 12 additions & 0 deletions src/sonic-py-common/tests/mock_swsscommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,15 @@ def connect(self, db):

def get(self, db, table, field):
return self.data.get(field, "N/A")


class ConfigDBConnector:
def __init__(self):
self.CONFIG_DB = 'CONFIG_DB'
self.data = {"key_encrypt": "True"}

def connect(self):
pass

def get(self, db, table, field):
return self.data.get(field, "N/A")
73 changes: 73 additions & 0 deletions src/sonic-py-common/tests/test_security_cipher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import sys

if sys.version_info.major == 3:
from unittest import mock
else:
import mock

import pytest
from sonic_py_common.security_cipher import master_key_mgr
from .mock_swsscommon import ConfigDBConnector

# TODO: Remove this if/else block once we no longer support Python 2
if sys.version_info.major == 3:
BUILTINS = "builtins"
else:
BUILTINS = "__builtin__"

DEFAULT_FILE = [
"#Auto generated file for storing the encryption passwords",
"TACPLUS : ",
"RADIUS : ",
"LDAP :"
]

UPDATED_FILE = [
"#Auto generated file for storing the encryption passwords",
"TACPLUS : ",
"RADIUS : TEST2",
"LDAP :"
]


class TestSecurityCipher(object):
def test_passkey_encryption(self):
with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \
mock.patch("os.chmod") as mock_chmod, \
mock.patch("{}.open".format(BUILTINS),mock.mock_open()) as mock_file:
temp = master_key_mgr()

# Use patch to replace the built-in 'open' function with a mock
with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \
mock.patch("os.chmod") as mock_chmod:
mock_fd = mock.MagicMock()
mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE)
mock_file.return_value.__enter__.return_value = mock_fd
encrypt, err = temp.encrypt_passkey("TACPLUS", "passkey1", "TEST1")
assert encrypt != "passkey1"

def test_passkey_decryption(self):
with mock.patch("sonic_py_common.security_cipher.ConfigDBConnector", new=ConfigDBConnector), \
mock.patch("os.chmod") as mock_chmod, \
mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file:
temp = master_key_mgr()

# Use patch to replace the built-in 'open' function with a mock
with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \
mock.patch("os.chmod") as mock_chmod:
mock_fd = mock.MagicMock()
mock_fd.readlines = mock.MagicMock(return_value=DEFAULT_FILE)
mock_file.return_value.__enter__.return_value = mock_fd
encrypt, err = temp.encrypt_passkey("RADIUS", "passkey2", "TEST2")

# Use patch to replace the built-in 'open' function with a mock
#with mock.patch("{}.open".format(BUILTINS), mock.mock_open(read_data=EXPECTED_PASSWD)) as mock_file:
with mock.patch("{}.open".format(BUILTINS), mock.mock_open()) as mock_file, \
mock.patch("os.chmod") as mock_chmod:
mock_fd = mock.MagicMock()
mock_fd.readlines = mock.MagicMock(return_value=UPDATED_FILE)
mock_file.return_value.__enter__.return_value = mock_fd
decrypt, err = temp.decrypt_passkey("RADIUS", encrypt)
assert decrypt == "passkey2"


Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ module sonic-system-tacacs {

leaf passkey {
type string {
length "1..65";
length "1..256";
pattern "[^ #,]*" {
error-message 'TACACS shared secret (Valid chars are ASCII printable except SPACE, "#", and ",")';
}
Expand Down
Loading