Skip to content

Commit

Permalink
Merge pull request #50 from EFForg/imsi-analyzer
Browse files Browse the repository at this point in the history
IMSI provided + null cipher analyzer
  • Loading branch information
cooperq authored Jul 19, 2024
2 parents 3cafb97 + 09fdb9d commit af3e47a
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 1 deletion.
4 changes: 3 additions & 1 deletion lib/src/analysis/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde::Serialize;

use crate::{diag::MessagesContainer, gsmtap_parser};

use super::{information_element::InformationElement, lte_downgrade::LteSib6And7DowngradeAnalyzer};
use super::{imsi_provided::ImsiProvidedAnalyzer, information_element::InformationElement, lte_downgrade::LteSib6And7DowngradeAnalyzer, null_cipher::NullCipherAnalyzer};

/// Qualitative measure of how severe a Warning event type is.
/// The levels should break down like this:
Expand Down Expand Up @@ -100,6 +100,8 @@ impl Harness {
pub fn new_with_all_analyzers() -> Self {
let mut harness = Harness::new();
harness.add_analyzer(Box::new(LteSib6And7DowngradeAnalyzer{}));
harness.add_analyzer(Box::new(ImsiProvidedAnalyzer{}));
harness.add_analyzer(Box::new(NullCipherAnalyzer{}));
harness
}

Expand Down
37 changes: 37 additions & 0 deletions lib/src/analysis/imsi_provided.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::borrow::Cow;

use telcom_parser::lte_rrc::{PCCH_MessageType, PCCH_MessageType_c1, PagingUE_Identity};

use super::analyzer::{Analyzer, Event, EventType, Severity};
use super::information_element::{InformationElement, LteInformationElement};

pub struct ImsiProvidedAnalyzer {
}

impl Analyzer for ImsiProvidedAnalyzer {
fn get_name(&self) -> Cow<str> {
Cow::from("IMSI Provided")
}

fn get_description(&self) -> Cow<str> {
Cow::from("Tests whether the UE's IMSI was ever provided to the cell")
}

fn analyze_information_element(&mut self, ie: &InformationElement) -> Option<Event> {
let InformationElement::LTE(LteInformationElement::PCCH(pcch_msg)) = ie else {
return None;
};
let PCCH_MessageType::C1(PCCH_MessageType_c1::Paging(paging)) = &pcch_msg.message else {
return None;
};
for record in &paging.paging_record_list.as_ref()?.0 {
if let PagingUE_Identity::Imsi(_) = record.ue_identity {
return Some(Event {
event_type: EventType::QualitativeWarning { severity: Severity::High },
message: "IMSI was provided to cell".to_string(),
})
}
}
None
}
}
2 changes: 2 additions & 0 deletions lib/src/analysis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod analyzer;
pub mod information_element;
pub mod lte_downgrade;
pub mod imsi_provided;
pub mod null_cipher;
115 changes: 115 additions & 0 deletions lib/src/analysis/null_cipher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::borrow::Cow;

use telcom_parser::lte_rrc::{CipheringAlgorithm_r12, DL_CCCH_MessageType, DL_CCCH_MessageType_c1, DL_DCCH_MessageType, DL_DCCH_MessageType_c1, PCCH_MessageType, PCCH_MessageType_c1, PagingUE_Identity, RRCConnectionReconfiguration, RRCConnectionReconfigurationCriticalExtensions, RRCConnectionReconfigurationCriticalExtensions_c1, RRCConnectionReconfiguration_r8_IEs, RRCConnectionRelease_v890_IEs, SCG_Configuration_r12, SecurityConfigHO_v1530HandoverType_v1530, SecurityModeCommand, SecurityModeCommandCriticalExtensions, SecurityModeCommandCriticalExtensions_c1};

use super::analyzer::{Analyzer, Event, EventType, Severity};
use super::information_element::{InformationElement, LteInformationElement};

pub struct NullCipherAnalyzer {
}

impl NullCipherAnalyzer {
fn check_rrc_connection_reconfiguration_cipher(&self, reconfiguration: &RRCConnectionReconfiguration) -> bool {
let RRCConnectionReconfigurationCriticalExtensions::C1(c1) = &reconfiguration.critical_extensions else {
return false;
};
let RRCConnectionReconfigurationCriticalExtensions_c1::RrcConnectionReconfiguration_r8(c1) = c1 else {
return false;
};
if let Some(handover) = &c1.security_config_ho {
let maybe_security_config = match &handover.handover_type {
telcom_parser::lte_rrc::SecurityConfigHOHandoverType::IntraLTE(lte) => lte.security_algorithm_config.as_ref(),
telcom_parser::lte_rrc::SecurityConfigHOHandoverType::InterRAT(rat) => Some(&rat.security_algorithm_config),
};
if let Some(security_config) = maybe_security_config {
if security_config.ciphering_algorithm.0 == CipheringAlgorithm_r12::EEA0 {
return true;
}
}
}
// Use map/flatten to dig into a long chain of nested Option types
let maybe_v1250 = c1.non_critical_extension.as_ref()
.map(|v890| v890.non_critical_extension.as_ref()).flatten()
.map(|v920| v920.non_critical_extension.as_ref()).flatten()
.map(|v1020| v1020.non_critical_extension.as_ref()).flatten()
.map(|v1130| v1130.non_critical_extension.as_ref()).flatten();
let Some(v1250) = maybe_v1250 else {
return false;
};

if let Some(SCG_Configuration_r12::Setup(scg_setup)) = v1250.scg_configuration_r12.as_ref() {
let maybe_cipher = scg_setup.scg_config_part_scg_r12.as_ref()
.map(|scg| scg.mobility_control_info_scg_r12.as_ref()).flatten()
.map(|mci| mci.ciphering_algorithm_scg_r12.as_ref()).flatten();
if let Some(cipher) = maybe_cipher {
if cipher.0 == CipheringAlgorithm_r12::EEA0 {
return true;
}
}
}

let maybe_v1530_security_config = v1250.non_critical_extension.as_ref()
.map(|v1310| v1310.non_critical_extension.as_ref()).flatten()
.map(|v1430| v1430.non_critical_extension.as_ref()).flatten()
.map(|v1510| v1510.non_critical_extension.as_ref()).flatten()
.map(|v1530| v1530.security_config_ho_v1530.as_ref()).flatten();
let Some(v1530_security_config) = maybe_v1530_security_config else {
return false;
};
let maybe_security_algorithm = match &v1530_security_config.handover_type_v1530 {
SecurityConfigHO_v1530HandoverType_v1530::Intra5GC(intra_5gc) => intra_5gc.security_algorithm_config_r15.as_ref(),
SecurityConfigHO_v1530HandoverType_v1530::Fivegc_ToEPC(to_epc) => Some(&to_epc.security_algorithm_config_r15),
SecurityConfigHO_v1530HandoverType_v1530::Epc_To5GC(to_5gc) => Some(&to_5gc.security_algorithm_config_r15),
};
if let Some(security_algorithm) = maybe_security_algorithm {
if security_algorithm.ciphering_algorithm.0 == CipheringAlgorithm_r12::EEA0 {
return true;
}
}
false
}

fn check_security_mode_command_cipher(&self, command: &SecurityModeCommand) -> bool {
let SecurityModeCommandCriticalExtensions::C1(c1) = &command.critical_extensions else {
return false;
};
let SecurityModeCommandCriticalExtensions_c1::SecurityModeCommand_r8(r8) = &c1 else {
return false;
};
if r8.security_config_smc.security_algorithm_config.ciphering_algorithm.0 == CipheringAlgorithm_r12::EEA0 {
return true;
}
false
}
}

impl Analyzer for NullCipherAnalyzer {
fn get_name(&self) -> Cow<str> {
Cow::from("Null Cipher")
}

fn get_description(&self) -> Cow<str> {
Cow::from("Tests whether the cell suggests using a null cipher (EEA0)")
}

fn analyze_information_element(&mut self, ie: &InformationElement) -> Option<Event> {
let InformationElement::LTE(LteInformationElement::DlDcch(dcch_msg)) = ie else {
return None;
};
let DL_DCCH_MessageType::C1(c1) = &dcch_msg.message else {
return None;
};
let null_cipher_detected = match c1 {
DL_DCCH_MessageType_c1::RrcConnectionReconfiguration(reconfiguration) => self.check_rrc_connection_reconfiguration_cipher(reconfiguration),
DL_DCCH_MessageType_c1::SecurityModeCommand(command) => self.check_security_mode_command_cipher(command),
_ => return None,
};
if null_cipher_detected {
return Some(Event {
event_type: EventType::QualitativeWarning { severity: Severity::High },
message: "Cell suggested use of null cipher".to_string(),
});
}
None
}
}
16 changes: 16 additions & 0 deletions tools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
## Rayhunter tools

### `asn1grep.py`: a script for finding a datatype in ASN.1 files

`asn1grep` parses our ASN.1 spec files, then searches for a given datatype by recursively descending through the LTE-RRC types we care about. it then prints out each result as a "path" through the highly nested datatypes.

Setup:
1. `python -m venv .venv && . .venv/bin/activate`
2. `pip install -r requirements.txt`

Usage:
```
» python asn1grep.py IMSI
searching for IMSI
PCCH-Message [message [message.c1 [c1 [c1.paging [paging [pagingRecordList[0] [ [ue-Identity [ue-Identity.imsi [IMSI]]]]]]]]]]
```
67 changes: 67 additions & 0 deletions tools/asn1grep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import asn1tools
import sys

ASN_FILES = [
'../telcom-parser/specs/PC5-RRC-Definitions.asn',
'../telcom-parser/specs/EUTRA-RRC-Definitions.asn',
]

TERMINATING_TYPE_NAMES = [
'DL-CCCH-Message',
'DL-DCCH-Message',
'UL-CCCH-Message',
'UL-DCCH-Message',
'BCCH-BCH-Message',
'BCCH-DL-SCH-Message',
'PCCH-Message',
'MCCH-Message',
'SC-MCCH-Message-r13',
'BCCH-BCH-Message-MBMS',
'BCCH-DL-SCH-Message-BR',
'BCCH-DL-SCH-Message-MBMS',
'SBCCH-SL-BCH-Message',
'SBCCH-SL-BCH-Message-V2X-r14',
]

def load_asn():
return asn1tools.compile_files(ASN_FILES, cache_dir=".cache")

def get_terminating_types(rrc_asn):
return [rrc_asn.types[name] for name in TERMINATING_TYPE_NAMES]

def search_type(haystack, needle):
if haystack.type_name == needle or haystack.name == needle:
return [needle]

result = []
if 'members' in haystack.__dict__:
for name, member in haystack.name_to_member.items():
for member_result in search_type(member, needle):
result.append(f"{haystack.name} ({haystack.type_name}).{name}\n {member_result}")
elif 'root_members' in haystack.__dict__:
for member in haystack.root_members:
for member_result in search_type(member, needle):
result.append(f"{haystack.name} ({haystack.type_name})\n {member_result}")
elif 'element_type' in haystack.__dict__:
for element_result in search_type(haystack.element_type, needle):
result.append(f"{haystack.name}[0] ({haystack.type_name})\n {element_result}")
elif 'inner' in haystack.__dict__:
for inner_result in search_type(haystack.inner, needle):
result.append(inner_result)

return result


if __name__ == "__main__":
type_name = sys.argv[1]
print(f"searching for {type_name}")

rrc_asn = load_asn()
terminating_types = get_terminating_types(rrc_asn)
needle = rrc_asn.types.get(type_name)
if needle == None:
raise ValueError(f"couldn't find type {type}")

for haystack in terminating_types:
for result in search_type(haystack.type, type_name):
print(result + '\n')
4 changes: 4 additions & 0 deletions tools/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
asn1tools==0.166.0
bitstruct==8.19.0
diskcache==5.6.3
pyparsing==3.1.2

0 comments on commit af3e47a

Please sign in to comment.