From 02647837e465caf583d1712a316a233b80f724d4 Mon Sep 17 00:00:00 2001 From: Miguel Osorio Date: Wed, 27 Nov 2024 11:24:49 -0800 Subject: [PATCH] [manuf] Extract device_id from CP stage. Extract device_id from CP to inject it into the FT stage. The individualize firmware is expected to handle this at the device level as well. Signed-off-by: Miguel Osorio --- .../orchestrator/src/device_id.py | 88 +++++++++++++++++-- .../provisioning/orchestrator/src/ot_dut.py | 59 +++++++++---- .../orchestrator/src/sku_config.py | 57 ++++++++++-- .../orchestrator/tests/device_id_test.py | 15 ++++ 4 files changed, 189 insertions(+), 30 deletions(-) diff --git a/sw/host/provisioning/orchestrator/src/device_id.py b/sw/host/provisioning/orchestrator/src/device_id.py index 6de6bd6d0b18e..1c5d14e221b67 100644 --- a/sw/host/provisioning/orchestrator/src/device_id.py +++ b/sw/host/provisioning/orchestrator/src/device_id.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from sku_config import SkuConfig -from util import bytes_to_int, format_hex +import util _RESERVED_WORD = 0 @@ -61,6 +61,21 @@ def to_int(self) -> int: din |= self.year return din + @staticmethod + def from_int(din: int) -> "DeviceIdentificationNumber": + year = din & 0xF + week = (din >> 4) & 0xFF + lot = (din >> 12) & 0xFFF + wafer = (din >> 24) & 0xFF + wafer_x_coord = (din >> 32) & 0xFFF + wafer_y_coord = (din >> 44) & 0xFFF + return DeviceIdentificationNumber(year=year, + week=week, + lot=lot, + wafer=wafer, + wafer_x_coord=wafer_x_coord, + wafer_y_coord=wafer_y_coord) + class DeviceId(): """An OpenTitan device ID. @@ -89,7 +104,7 @@ def __init__(self, sku_config: SkuConfig, din: DeviceIdentificationNumber): # - 16 bits Product ID self.si_creator_id = sku_config.si_creator_id self.product_id = sku_config.product_id - self._hw_origin = bytes_to_int( + self._hw_origin = util.bytes_to_int( struct.pack(" None: + """Updates the base unique ID with another DeviceId object. + + Updates the base_id with another DeviceId object's base_id as well as + the HW origin, SiliconCreator ID, and Product ID. + + Args: + other: The other DeviceId object to update with. + """ + self.si_creator_id = other.si_creator_id + self.product_id = other.product_id + self._hw_origin = other._hw_origin + + self.din = other.din + self._base_uid = other._base_uid + self.device_id = (self._sku_specific << 128) | self._base_uid + + @staticmethod + def from_hexstr(hexstr: str) -> "DeviceId": + """Creates a DeviceId object from a hex string.""" + cp_device_id = util.parse_hexstring_to_int(hexstr) + return DeviceId.from_int(cp_device_id) + + @staticmethod + def from_int(device_id: int) -> "DeviceId": + """Creates a DeviceId object from an int.""" + # Extract SKU specific field. + sku_specific = device_id >> 128 + package_id = sku_specific & 0xFFFF + sku_id = (sku_specific >> 32) & 0xFFFFFFFF + + # Extract base unique ID. + mask = (1 << 128) - 1 + base_uid = device_id & mask + # Extract HW origin. + hw_origin = base_uid & 0xFFFFFFFFFF + si_creator_id = hw_origin & 0xFFFF + product_id = (hw_origin >> 16) & 0xFFFF + + # Extract SKU config. + sku_config = SkuConfig.from_ids(product_id, si_creator_id, package_id) + + try: + sku_name = struct.pack('>I', sku_id).decode('ascii') + except UnicodeDecodeError: + sku_name = "Unknown" + sku_config.name = sku_name + + # Extract DIN. + mask_din = (1 << 64) - 1 + din = DeviceIdentificationNumber.from_int((base_uid >> 32) & mask_din) + + return DeviceId(sku_config, din) + def to_hexstr(self) -> str: """Returns the device ID as a hex string.""" - return format_hex(self.device_id, width=64) + return util.format_hex(self.device_id, width=64) def to_int(self) -> int: """Returns the device ID as an int.""" @@ -131,9 +201,9 @@ def to_int(self) -> int: def pretty_print(self): print("> Device ID: {}".format(self)) print("SiliconCreator ID: {} ({})".format( - format_hex(self.si_creator_id, width=4), self._si_creator)) + util.format_hex(self.si_creator_id, width=4), self._si_creator)) print("Product ID: {} ({})".format( - format_hex(self.product_id, width=4), self._product)) + util.format_hex(self.product_id, width=4), self._product)) print("DIN Year: {}".format(self.din.year)) print("DIN Week: {}".format(self.din.week)) print("DIN Lot: {}".format(self.din.lot)) @@ -142,7 +212,7 @@ def pretty_print(self): print("DIN Wafer Y Coord: {}".format(self.din.wafer_y_coord)) print("Reserved: {}".format(hex(0))) print("SKU ID: {} ({})".format( - format_hex(self.sku_id), + util.format_hex(self.sku_id), self.sku_id.to_bytes(length=4, byteorder="big").decode("utf-8"))) print("Package ID: {} ({})".format(self.package_id, self._package)) diff --git a/sw/host/provisioning/orchestrator/src/ot_dut.py b/sw/host/provisioning/orchestrator/src/ot_dut.py index d5fd08b4fa2bd..1209003d1535e 100644 --- a/sw/host/provisioning/orchestrator/src/ot_dut.py +++ b/sw/host/provisioning/orchestrator/src/ot_dut.py @@ -5,6 +5,7 @@ import json import logging import os +import re import tempfile from dataclasses import dataclass @@ -50,11 +51,8 @@ class OtDut(): fpga: str require_confirmation: bool = True - def __post_init__(self): - self.log_dir = f"{self.logs_root_dir}/{str(self.device_id)[2:]}" - self._make_log_dir() - def _make_log_dir(self) -> None: + self.log_dir = f"{self.logs_root_dir}/{str(self.device_id)[2:]}" if self.require_confirmation and os.path.exists(self.log_dir): logging.warning( f"Log file {self.log_dir} already exists. Continue to overwrite." @@ -98,28 +96,59 @@ def run_cp(self) -> None: --logging=info \ {host_flags} \ --elf={device_elf} \ - --device-id="{self.device_id}" \ --test-unlock-token="{format_hex(self.test_unlock_token, width=32)}" \ --test-exit-token="{format_hex(self.test_exit_token, width=32)}" \ - --manuf-state="{_ZERO_256BIT_HEXSTR}" \ --wafer-auth-secret="{_ZERO_256BIT_HEXSTR}" \ """ - # TODO: capture DIN portion of device ID and update device ID. - # Get user confirmation before running command. logging.info(f"Running command: {cmd}") if self.require_confirmation: confirm() # Run provisioning flow and collect logs. - res = run(cmd, f"{self.log_dir}/cp_out.log.txt", - f"{self.log_dir}/cp_err.log.txt") - if res.returncode != 0: - logging.warning(f"CP failed with exit code: {res.returncode}.") - confirm() - else: - logging.info("CP completed successfully.") + with tempfile.TemporaryDirectory() as tmpdir: + stdout_logfile = f"{tmpdir}/cp_out.log.txt" + stderr_logfile = f"{tmpdir}/cp_err.log.txt" + res = run(cmd, stdout_logfile, stderr_logfile) + + if res.returncode != 0: + logging.warning(f"CP failed with exit code: {res.returncode}.") + confirm() + + with open(stdout_logfile, "r") as f: + log_data = f.read() + + pattern = r'CHIP_PROBE_DATA:\s*({.*?})' + match = re.search(pattern, log_data) + if match: + json_string = match.group(1) + try: + chip_probe_data = json.loads(json_string) + logging.info(f"CHIP_PROBE_DATA: {chip_probe_data}") + except json.JSONDecodeError as e: + logging.error(f"Failed to parse CHIP_PROBE_DATA: {e}") + confirm() + else: + logging.error("CHIP_PROBE_DATA not found.") + confirm() + + if "cp_device_id" not in chip_probe_data: + logging.error("cp_device_id found in CHIP_PROBE_DATA.") + confirm() + + logging.info( + f"Updating device ID to: {chip_probe_data['cp_device_id']}") + cp_device_id = self.device_id.from_hexstr( + chip_probe_data["cp_device_id"]) + self.device_id.update_base_id(cp_device_id) + + self._make_log_dir() + os.rename(stdout_logfile, f"{self.log_dir}/cp_out.log.txt") + os.rename(stderr_logfile, f"{self.log_dir}/cp_err.log.txt") + + logging.info(f"CP logs saved to {self.log_dir}.") + logging.info("CP completed successfully.") def run_ft(self) -> None: """Runs the FT provisioning flow on the target DUT.""" diff --git a/sw/host/provisioning/orchestrator/src/sku_config.py b/sw/host/provisioning/orchestrator/src/sku_config.py index adae1f5f4f344..8542586476f46 100644 --- a/sw/host/provisioning/orchestrator/src/sku_config.py +++ b/sw/host/provisioning/orchestrator/src/sku_config.py @@ -5,6 +5,7 @@ from collections import OrderedDict from dataclasses import dataclass +from typing import Optional import hjson @@ -22,14 +23,16 @@ class SkuConfig: si_creator: str # valid: any SiliconCreator that exists in product database package: str # valid: any package that exists in package database target_lc_state: str # valid: must be in ["dev", "prod", "prod_end"] - dice_ca: OrderedDict # valid: see CaConfig - ext_ca: OrderedDict # valid: see CaConfig + dice_ca: Optional[OrderedDict] # valid: see CaConfig + ext_ca: Optional[OrderedDict] # valid: see CaConfig token_encrypt_key: str def __post_init__(self): # Load CA configs. - self.dice_ca = CaConfig(name="dice_ca", **self.dice_ca) - self.ext_ca = CaConfig(name="ext_ca", **self.ext_ca) + if self.dice_ca: + self.dice_ca = CaConfig(name="dice_ca", **self.dice_ca) + if self.ext_ca: + self.ext_ca = CaConfig(name="ext_ca", **self.ext_ca) # Load product IDs database. self._product_ids = None @@ -49,7 +52,49 @@ def __post_init__(self): self._product_ids["si_creator_ids"][self.si_creator], 16) self.product_id = int(self._product_ids["product_ids"][self.product], 16) - self.package_id = int(self._package_ids[self.package], 16) + + if self.package in self._package_ids: + self.package_id = int(self._package_ids[self.package], 16) + + @staticmethod + def from_ids(product_id: int, si_creator_id: int, + package_id: int) -> "SkuConfig": + """Creates a SKU configuration object from product, SiliconCreator, and package IDs.""" + # Load product IDs database. + product_ids = None + with open(_PRODUCT_IDS_HJSON, "r") as fp: + product_ids = hjson.load(fp) + + # Load package IDs database. + package_ids = None + with open(_PACKAGE_IDS_HJSON, "r") as fp: + package_ids = hjson.load(fp) + + # Create SKU configuration object. + product = None + si_creator = None + package = None + for key, value in product_ids["product_ids"].items(): + if int(value, 16) == product_id: + product = key + break + for key, value in product_ids["si_creator_ids"].items(): + if int(value, 16) == si_creator_id: + si_creator = key + break + for key, value in package_ids.items(): + if int(value, 16) == package_id: + package = key + break + sku_config = SkuConfig(name="cp_device_id", + product=product, + si_creator=si_creator, + package=package, + target_lc_state="prod", + dice_ca=OrderedDict(), + ext_ca=OrderedDict(), + token_encrypt_key="") + return sku_config def validate(self) -> None: """Validates this object's attributes.""" @@ -65,7 +110,7 @@ def validate(self) -> None: "SiliconCreator ({}) must be product database.".format( self.si_creator)) # Validate package name. - if self.package not in self._package_ids: + if self.package and self.package not in self._package_ids: raise ValueError("Package ({}) must be package database.".format( self.package)) # Validate target_lc_state. diff --git a/sw/host/provisioning/orchestrator/tests/device_id_test.py b/sw/host/provisioning/orchestrator/tests/device_id_test.py index 57febbd96212f..be268671ed4c7 100644 --- a/sw/host/provisioning/orchestrator/tests/device_id_test.py +++ b/sw/host/provisioning/orchestrator/tests/device_id_test.py @@ -147,6 +147,21 @@ def test_sku_specific_reserved_field_1(self): def test_pretty_print(self): self.device_id.pretty_print() + def test_from_hexstr(self): + # Simulate getting a different base ID from CP stage. + din = DeviceIdentificationNumber(year=5, + week=12, + lot=398, + wafer=12, + wafer_x_coord=200, + wafer_y_coord=100) + expected = DeviceId(sku_config=self.sku_config, din=din) + hexstr = expected.to_hexstr() + cp_device_id = DeviceId.from_hexstr(hexstr) + + self.device_id.update_base_id(cp_device_id) + self.assertEqual(expected.to_int(), self.device_id.to_int()) + if __name__ == '__main__': unittest.main()