Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[manuf] Extract device_id from CP stage. #25437

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 79 additions & 9 deletions sw/host/provisioning/orchestrator/src/device_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass

from sku_config import SkuConfig
from util import bytes_to_int, format_hex
import util

_RESERVED_WORD = 0

Expand Down Expand Up @@ -61,6 +61,21 @@ def to_int(self) -> int:
din |= self.year
return din

@staticmethod
def from_int(din: int) -> "DeviceIdentificationNumber":
year = din & 0xF
week = (din >> 4) & 0xFF
lot = (din >> 12) & 0xFFF
wafer = (din >> 24) & 0xFF
wafer_x_coord = (din >> 32) & 0xFFF
wafer_y_coord = (din >> 44) & 0xFFF
return DeviceIdentificationNumber(year=year,
week=week,
lot=lot,
wafer=wafer,
wafer_x_coord=wafer_x_coord,
wafer_y_coord=wafer_y_coord)


class DeviceId():
"""An OpenTitan device ID.
Expand Down Expand Up @@ -89,7 +104,7 @@ def __init__(self, sku_config: SkuConfig, din: DeviceIdentificationNumber):
# - 16 bits Product ID
self.si_creator_id = sku_config.si_creator_id
self.product_id = sku_config.product_id
self._hw_origin = bytes_to_int(
self._hw_origin = util.bytes_to_int(
struct.pack("<HH", self.si_creator_id, self.product_id))

# Build Device Identification Number with:
Expand All @@ -102,13 +117,14 @@ def __init__(self, sku_config: SkuConfig, din: DeviceIdentificationNumber):
self.din = din

# Build base unique ID.
self._base_uid = bytes_to_int(
self._base_uid = util.bytes_to_int(
struct.pack("<IQI", self._hw_origin, self.din.to_int(), 0))

# Build SKU specific field.
self.package_id = sku_config.package_id
self.sku_id = bytes_to_int(self._sku.upper()[:4].encode("utf-8")[::-1])
self._sku_specific = bytes_to_int(
self.sku_id = util.bytes_to_int(
self._sku.upper()[:4].encode("utf-8")[::-1])
self._sku_specific = util.bytes_to_int(
struct.pack(
"<HHIQ",
self.package_id,
Expand All @@ -120,9 +136,63 @@ def __init__(self, sku_config: SkuConfig, din: DeviceIdentificationNumber):
# Build full device ID.
self.device_id = (self._sku_specific << 128) | self._base_uid

def update_base_id(self, other: "DeviceId") -> None:
"""Updates the base unique ID with another DeviceId object.

Updates the base_id with another DeviceId object's base_id as well as
the HW origin, SiliconCreator ID, and Product ID.

Args:
other: The other DeviceId object to update with.
"""
self.si_creator_id = other.si_creator_id
self.product_id = other.product_id
self._hw_origin = other._hw_origin

self.din = other.din
self._base_uid = other._base_uid
self.device_id = (self._sku_specific << 128) | self._base_uid

@staticmethod
def from_hexstr(hexstr: str) -> "DeviceId":
"""Creates a DeviceId object from a hex string."""
cp_device_id = util.parse_hexstring_to_int(hexstr)
return DeviceId.from_int(cp_device_id)

@staticmethod
def from_int(device_id: int) -> "DeviceId":
"""Creates a DeviceId object from an int."""
# Extract SKU specific field.
sku_specific = device_id >> 128
package_id = sku_specific & 0xFFFF
sku_id = (sku_specific >> 32) & 0xFFFFFFFF

# Extract base unique ID.
mask = (1 << 128) - 1
base_uid = device_id & mask
# Extract HW origin.
hw_origin = base_uid & 0xFFFFFFFFFF
si_creator_id = hw_origin & 0xFFFF
product_id = (hw_origin >> 16) & 0xFFFF

# Extract SKU config.
sku_config = SkuConfig.from_ids(product_id, si_creator_id, package_id)

try:
sku_name = struct.pack('>I', sku_id).decode('ascii')
except UnicodeDecodeError:
sku_name = "Unknown"
sku_config.name = sku_name

# Extract DIN.
mask_din = (1 << 64) - 1
din = DeviceIdentificationNumber.from_int((base_uid >> 32) & mask_din)

return DeviceId(sku_config, din)

def to_hexstr(self) -> str:
"""Returns the device ID as a hex string."""
return format_hex(self.device_id, width=64)
return util.format_hex(self.device_id, width=64)

def to_int(self) -> int:
"""Returns the device ID as an int."""
Expand All @@ -131,9 +201,9 @@ def to_int(self) -> int:
def pretty_print(self):
print("> Device ID: {}".format(self))
print("SiliconCreator ID: {} ({})".format(
format_hex(self.si_creator_id, width=4), self._si_creator))
util.format_hex(self.si_creator_id, width=4), self._si_creator))
print("Product ID: {} ({})".format(
format_hex(self.product_id, width=4), self._product))
util.format_hex(self.product_id, width=4), self._product))
print("DIN Year: {}".format(self.din.year))
print("DIN Week: {}".format(self.din.week))
print("DIN Lot: {}".format(self.din.lot))
Expand All @@ -142,7 +212,7 @@ def pretty_print(self):
print("DIN Wafer Y Coord: {}".format(self.din.wafer_y_coord))
print("Reserved: {}".format(hex(0)))
print("SKU ID: {} ({})".format(
format_hex(self.sku_id),
util.format_hex(self.sku_id),
self.sku_id.to_bytes(length=4, byteorder="big").decode("utf-8")))
print("Package ID: {} ({})".format(self.package_id,
self._package))
Expand Down
59 changes: 44 additions & 15 deletions sw/host/provisioning/orchestrator/src/ot_dut.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import os
import re
import tempfile
from dataclasses import dataclass

Expand Down Expand Up @@ -50,11 +51,8 @@ class OtDut():
fpga: str
require_confirmation: bool = True

def __post_init__(self):
self.log_dir = f"{self.logs_root_dir}/{str(self.device_id)[2:]}"
self._make_log_dir()

def _make_log_dir(self) -> None:
self.log_dir = f"{self.logs_root_dir}/{str(self.device_id)[2:]}"
if self.require_confirmation and os.path.exists(self.log_dir):
logging.warning(
f"Log file {self.log_dir} already exists. Continue to overwrite."
Expand Down Expand Up @@ -98,28 +96,59 @@ def run_cp(self) -> None:
--logging=info \
{host_flags} \
--elf={device_elf} \
--device-id="{self.device_id}" \
--test-unlock-token="{format_hex(self.test_unlock_token, width=32)}" \
--test-exit-token="{format_hex(self.test_exit_token, width=32)}" \
--manuf-state="{_ZERO_256BIT_HEXSTR}" \
--wafer-auth-secret="{_ZERO_256BIT_HEXSTR}" \
"""

# TODO: capture DIN portion of device ID and update device ID.

# Get user confirmation before running command.
logging.info(f"Running command: {cmd}")
if self.require_confirmation:
confirm()

# Run provisioning flow and collect logs.
res = run(cmd, f"{self.log_dir}/cp_out.log.txt",
f"{self.log_dir}/cp_err.log.txt")
if res.returncode != 0:
logging.warning(f"CP failed with exit code: {res.returncode}.")
confirm()
else:
logging.info("CP completed successfully.")
with tempfile.TemporaryDirectory() as tmpdir:
stdout_logfile = f"{tmpdir}/cp_out.log.txt"
stderr_logfile = f"{tmpdir}/cp_err.log.txt"
res = run(cmd, stdout_logfile, stderr_logfile)

if res.returncode != 0:
logging.warning(f"CP failed with exit code: {res.returncode}.")
confirm()

with open(stdout_logfile, "r") as f:
log_data = f.read()

pattern = r'CHIP_PROBE_DATA:\s*({.*?})'
match = re.search(pattern, log_data)
if match:
json_string = match.group(1)
try:
chip_probe_data = json.loads(json_string)
logging.info(f"CHIP_PROBE_DATA: {chip_probe_data}")
except json.JSONDecodeError as e:
logging.error(f"Failed to parse CHIP_PROBE_DATA: {e}")
confirm()
else:
logging.error("CHIP_PROBE_DATA not found.")
confirm()

if "cp_device_id" not in chip_probe_data:
logging.error("cp_device_id found in CHIP_PROBE_DATA.")
confirm()

logging.info(
f"Updating device ID to: {chip_probe_data['cp_device_id']}")
cp_device_id = self.device_id.from_hexstr(
chip_probe_data["cp_device_id"])
self.device_id.update_base_id(cp_device_id)

self._make_log_dir()
os.rename(stdout_logfile, f"{self.log_dir}/cp_out.log.txt")
os.rename(stderr_logfile, f"{self.log_dir}/cp_err.log.txt")

logging.info(f"CP logs saved to {self.log_dir}.")
logging.info("CP completed successfully.")

def run_ft(self) -> None:
"""Runs the FT provisioning flow on the target DUT."""
Expand Down
57 changes: 51 additions & 6 deletions sw/host/provisioning/orchestrator/src/sku_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from collections import OrderedDict
from dataclasses import dataclass
from typing import Optional

import hjson

Expand All @@ -22,14 +23,16 @@ class SkuConfig:
si_creator: str # valid: any SiliconCreator that exists in product database
package: str # valid: any package that exists in package database
target_lc_state: str # valid: must be in ["dev", "prod", "prod_end"]
dice_ca: OrderedDict # valid: see CaConfig
ext_ca: OrderedDict # valid: see CaConfig
dice_ca: Optional[OrderedDict] # valid: see CaConfig
ext_ca: Optional[OrderedDict] # valid: see CaConfig
token_encrypt_key: str

def __post_init__(self):
# Load CA configs.
self.dice_ca = CaConfig(name="dice_ca", **self.dice_ca)
self.ext_ca = CaConfig(name="ext_ca", **self.ext_ca)
if self.dice_ca:
self.dice_ca = CaConfig(name="dice_ca", **self.dice_ca)
if self.ext_ca:
self.ext_ca = CaConfig(name="ext_ca", **self.ext_ca)

# Load product IDs database.
self._product_ids = None
Expand All @@ -49,7 +52,49 @@ def __post_init__(self):
self._product_ids["si_creator_ids"][self.si_creator], 16)
self.product_id = int(self._product_ids["product_ids"][self.product],
16)
self.package_id = int(self._package_ids[self.package], 16)

if self.package in self._package_ids:
self.package_id = int(self._package_ids[self.package], 16)

@staticmethod
def from_ids(product_id: int, si_creator_id: int,
package_id: int) -> "SkuConfig":
"""Creates a SKU configuration object from product, SiliconCreator, and package IDs."""
# Load product IDs database.
product_ids = None
with open(_PRODUCT_IDS_HJSON, "r") as fp:
product_ids = hjson.load(fp)

# Load package IDs database.
package_ids = None
with open(_PACKAGE_IDS_HJSON, "r") as fp:
package_ids = hjson.load(fp)

# Create SKU configuration object.
product = None
si_creator = None
package = None
for key, value in product_ids["product_ids"].items():
if int(value, 16) == product_id:
product = key
break
for key, value in product_ids["si_creator_ids"].items():
if int(value, 16) == si_creator_id:
si_creator = key
break
for key, value in package_ids.items():
if int(value, 16) == package_id:
package = key
break
sku_config = SkuConfig(name="cp_device_id",
product=product,
si_creator=si_creator,
package=package,
target_lc_state="prod",
dice_ca=OrderedDict(),
ext_ca=OrderedDict(),
token_encrypt_key="")
return sku_config

def validate(self) -> None:
"""Validates this object's attributes."""
Expand All @@ -65,7 +110,7 @@ def validate(self) -> None:
"SiliconCreator ({}) must be product database.".format(
self.si_creator))
# Validate package name.
if self.package not in self._package_ids:
if self.package and self.package not in self._package_ids:
raise ValueError("Package ({}) must be package database.".format(
self.package))
# Validate target_lc_state.
Expand Down
15 changes: 15 additions & 0 deletions sw/host/provisioning/orchestrator/tests/device_id_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ def test_sku_specific_reserved_field_1(self):
def test_pretty_print(self):
self.device_id.pretty_print()

def test_from_hexstr(self):
# Simulate getting a different base ID from CP stage.
din = DeviceIdentificationNumber(year=5,
week=12,
lot=398,
wafer=12,
wafer_x_coord=200,
wafer_y_coord=100)
expected = DeviceId(sku_config=self.sku_config, din=din)
hexstr = expected.to_hexstr()
cp_device_id = DeviceId.from_hexstr(hexstr)

self.device_id.update_base_id(cp_device_id)
self.assertEqual(expected.to_int(), self.device_id.to_int())


if __name__ == '__main__':
unittest.main()
Loading