diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4fea2e0187edc8e232968d579ef2a533286d8d93..4415ee2b015834e14520ad4db72dc6813207586b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,7 +34,8 @@ jobs: os: - ubuntu-latest - macOS-latest - - windows-latest + - windows-2019 + - windows-2022 python-version: - 3.6 - 3.7 @@ -50,6 +51,16 @@ jobs: python-arch: x86 - os: macOS-latest python-arch: x86 + - os: windows-2019 + python-arch: x86 + - os: windows-2019 + python-version: 3.6 + - os: windows-2019 + python-version: 3.7 + - os: windows-2019 + python-version: 3.8 + - os: windows-2019 + python-version: 3.9 steps: - uses: actions/checkout@v2 diff --git a/CHANGELOG.md b/CHANGELOG.md index ea6d7b86e338356b9add3bcafd492612aeb4b59e..1191481b2401d4023f6fa827b6b978de4745a8a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,11 @@ # Changelog -## 1.7.1 - TBD +## 1.8.0 - TBD +* Added support for 256bit keyed encryption ciphers +* Added support for signing with AES GCM +* Now sends the `SMB2_NETNAME_NEGOTIATE_CONTEXT_ID` with the negotiate request +* Adds the Python requirement of [pykrb5](https://github.com/jborean93/pykrb5) for Kerberos support on non-Windows * Fix unpacking security descriptor ACEs with extra data on the end - https://github.com/jborean93/smbprotocol/pull/143 * Set `index_number` in `FileInternalInformation` to be an unsigned integer to match the other structures * Clear out expired DFS referrals to avoid memory leaks and stale DFS information - https://github.com/jborean93/smbprotocol/issues/136 diff --git a/build_helpers/lib.sh b/build_helpers/lib.sh index c9b412f2c79160c6b85896c095782bfbd92b72d4..182d7c5f4d5f75be8de535b4ef89d3b23f3dae87 100755 --- a/build_helpers/lib.sh +++ b/build_helpers/lib.sh @@ -35,7 +35,7 @@ lib::setup::smb_server() { --detach \ --rm \ --publish ${SMB_PORT}:445 \ - --volume $( pwd )/build_helpers:/app \ + --volume $( pwd )/build_helpers:/app:z \ --workdir /app \ archlinux:latest \ /bin/bash \ diff --git a/build_helpers/samba-setup.sh b/build_helpers/samba-setup.sh index 7883be91386343070f310b603366991ec173f591..619a6c0ea03d8a1226094df13d43dff73a960618 100755 --- a/build_helpers/samba-setup.sh +++ b/build_helpers/samba-setup.sh @@ -16,7 +16,7 @@ valid users = @smbgroup server signing = mandatory ea support = yes store dos attributes = yes -vfs objects = xattr_tdb streams_xattr +vfs objects = streams_xattr xattr_tdb log level = 0 [dfs] diff --git a/setup.py b/setup.py index d76adbbf8a2c3536b5ae0df3c68f24ed1a2c62a8..53a36266afbc7b82fff9d54142abb798a7598ec4 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ with open(abs_path('README.md'), mode='rb') as fd: setup( name='smbprotocol', - version='1.7.1', + version='1.8.0', packages=['smbclient', 'smbprotocol'], install_requires=[ 'cryptography>=2.0', diff --git a/smbprotocol/connection.py b/smbprotocol/connection.py index 5146a37e30370087c39935a1644610709e58afaf..680d45f4b12c0017d510d72398eb523d2d0f2aac 100644 --- a/smbprotocol/connection.py +++ b/smbprotocol/connection.py @@ -6,7 +6,6 @@ import binascii import hashlib import hmac import logging -import math import os import struct import time @@ -16,10 +15,6 @@ from collections import ( OrderedDict, ) -from cryptography.exceptions import ( - UnsupportedAlgorithm, -) - from cryptography.hazmat.backends import ( default_backend, ) @@ -77,6 +72,7 @@ from smbprotocol.structure import ( FlagField, IntField, ListField, + TextField, Structure, StructureField, UuidField, @@ -130,6 +126,11 @@ class NegotiateContextType(object): """ SMB2_PREAUTH_INTEGRITY_CAPABILITIES = 0x0001 SMB2_ENCRYPTION_CAPABILITIES = 0x0002 + SMB2_COMPRESSION_CAPABILITIES = 0x0003 + SMB2_NETNAME_NEGOTIATE_CONTEXT_ID = 0x0005 + SMB2_TRANSPORT_CAPABILITIES = 0x0006 + SMB2_RDMA_TRANSFORM_CAPABILITIES = 0x0007 + SMB2_SIGNING_CAPABILITIES = 0x0008 class HashAlgorithms(object): @@ -141,12 +142,6 @@ class HashAlgorithms(object): """ SHA_512 = 0x0001 - @staticmethod - def get_algorithm(hash): - return { - HashAlgorithms.SHA_512: hashlib.sha512 - }[hash] - class Ciphers(object): """ @@ -157,28 +152,20 @@ class Ciphers(object): """ AES_128_CCM = 0x0001 AES_128_GCM = 0x0002 + AES_256_CCM = 0x0003 + AES_256_GCM = 0x0004 - @staticmethod - def get_cipher(cipher): - return { - Ciphers.AES_128_CCM: aead.AESCCM, - Ciphers.AES_128_GCM: aead.AESGCM - }[cipher] - @staticmethod - def get_supported_ciphers(): - supported_ciphers = [] - try: - aead.AESGCM(b"\x00" * 16) - supported_ciphers.append(Ciphers.AES_128_GCM) - except UnsupportedAlgorithm: # pragma: no cover - pass - try: - aead.AESCCM(b"\x00" * 16) - supported_ciphers.append(Ciphers.AES_128_CCM) - except UnsupportedAlgorithm: # pragma: no cover - pass - return supported_ciphers +class SigningAlgorithms: + """ + [MS-SMB2] 2.2.3.1.7 SMB2_SIGNING_CAPABILITIES + + https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-smb2/cb9b5d66-b6be-4d18-aa66-8784a871cc10 + 16-bit integer IDs that specify the supported signing algorithms. + """ + HMAC_SHA256 = 0x0000 + AES_CMAC = 0x0001 + AES_GMAC = 0x0002 class SMB2NegotiateRequest(Structure): @@ -301,16 +288,20 @@ class SMB3NegotiateRequest(Structure): context_count = structure['negotiate_context_count'].get_value() context_list = [] for idx in range(0, context_count): - field, data = self._parse_negotiate_context_entry(data, idx) + field, data = self._parse_negotiate_context_entry(data) context_list.append(field) return context_list - def _parse_negotiate_context_entry(self, data, idx): + def _parse_negotiate_context_entry(self, data): data_length = struct.unpack("<H", data[2:4])[0] negotiate_context = SMB2NegotiateContextRequest() negotiate_context.unpack(data[:data_length + 8]) - return negotiate_context, data[8 + data_length:] + padded_size = data_length % 8 + if padded_size != 0: + padded_size = 8 - padded_size + + return negotiate_context, data[8 + data_length + padded_size:] class SMB2NegotiateContextRequest(Structure): @@ -349,11 +340,14 @@ class SMB2NegotiateContextRequest(Structure): def _data_structure_type(self, structure): con_type = structure['context_type'].get_value() - if con_type == \ - NegotiateContextType.SMB2_PREAUTH_INTEGRITY_CAPABILITIES: + if con_type == NegotiateContextType.SMB2_PREAUTH_INTEGRITY_CAPABILITIES: return SMB2PreauthIntegrityCapabilities elif con_type == NegotiateContextType.SMB2_ENCRYPTION_CAPABILITIES: return SMB2EncryptionCapabilities + elif con_type == NegotiateContextType.SMB2_NETNAME_NEGOTIATE_CONTEXT_ID: + return SMB2NetnameNegotiateContextId + elif con_type == NegotiateContextType.SMB2_SIGNING_CAPABILITIES: + return SMB2SigningCapabilities def _padding_size(self, structure): data_size = len(structure['data']) @@ -418,6 +412,48 @@ class SMB2EncryptionCapabilities(Structure): super(SMB2EncryptionCapabilities, self).__init__() +class SMB2NetnameNegotiateContextId(Structure): + """ + [MS-SMB2] 2.2.3.1.4 SMB2_NETNAME_NEGOTIATE_CONTEXT_ID + + https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-smb2/ca6726bd-b9cf-43d9-b0bc-d127d3c993b3 + + The SMB2_NETNAME_NEGOTIATE_CONTEXT_ID context is specified in an SMB2 + NEGOTIATE request to indicate the server name the client connects to. + """ + + def __init__(self): + self.fields = OrderedDict([ + ('net_name', TextField()), + ]) + super().__init__() + + +class SMB2SigningCapabilities(Structure): + """ + [MS-SMB2] 2.2.3.1.7 SMB2_SIGNING_CAPABILITIES + + https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-smb2/cb9b5d66-b6be-4d18-aa66-8784a871cc10 + + The SMB2_SIGNING_CAPABILITIES context is specified in an SMB2 NEGOTIATE + request by the client to indicate which signing algorithms the client supports. + """ + + def __init__(self): + self.fields = OrderedDict([ + ('signing_algorithm_count', IntField( + size=2, + default=lambda s: len(s['signing_algorithms'].get_value()), + )), + ('signing_algorithms', ListField( + size=lambda s: s['signing_algorithm_count'].get_value() * 2, + list_count=lambda s: s['signing_algorithm_count'].get_value(), + list_type=EnumField(size=2, enum_type=SigningAlgorithms), + )), + ]) + super().__init__() + + class SMB2NegotiateResponse(Structure): """ [MS-SMB2] v53.0 2017-09-15 @@ -696,10 +732,13 @@ class Connection(object): # The cipher object that was negotiated self.cipher_id = None + # The signing algorithm that was negotiated + self.signing_algorithm_id = None + # Keep track of the message processing thread's potential traceback that it may raise. self._t_exc = None - def connect(self, dialect=None, timeout=60): + def connect(self, dialect=None, timeout=60, preferred_encryption_algos=None, preferred_signing_algos=None): """ Will connect to the target server and negotiate the capabilities with the client. Once setup, the client MUST call the disconnect() @@ -707,11 +746,35 @@ class Connection(object): various connection properties that denote the capabilities of the server. + If no preferred encryption or signing algorithms are specified then + all algorithms are offered during negotiation. Older dialects may not + be offered if a custom encryption or signing algorithm list is + specified without the algorithm required by that dialect. + + By default the following encryption algorithms are used: + + AES_128_GCM + AES_128_CCM (required for SMB 3.0.x) + AES_256_GCM + AES_256_CCM + + By default the following signing algorithms are used: + + AES_GMAC + AES_CMAC (required for SMB 3.0.x) + HMAC_SHA256 (required for SMB 2.x) + :param dialect: If specified, forces the dialect that is negotiated with the server, if not set, then the newest dialect supported by the server is used up to SMB 3.1.1 :param timeout: The timeout in seconds to wait for the initial negotiation process to complete + :param preferred_encryption_algos: A list of encryption algorithm ids + in priority order from highest to lowest. See :class:`Ciphers` for + a list of known identifiers. + :param preferred_signing_algos: A list of signing algorithm ids in + priority order from highest to lowest. + See :class:`SigningAlgorithms` for a list of known identifiers. """ log.info("Setting up transport connection") self.transport = Tcp(self.server_name, self.port, timeout) @@ -722,7 +785,18 @@ class Connection(object): t_worker.start() log.info("Starting negotiation with SMB server") - smb_response = self._send_smb2_negotiate(dialect, timeout) + enc_algos = preferred_encryption_algos or [ + Ciphers.AES_128_GCM, + Ciphers.AES_128_CCM, + Ciphers.AES_256_GCM, + Ciphers.AES_256_CCM, + ] + sign_algos = preferred_signing_algos or [ + SigningAlgorithms.AES_GMAC, + SigningAlgorithms.AES_CMAC, + SigningAlgorithms.HMAC_SHA256, + ] + smb_response = self._send_smb2_negotiate(dialect, timeout, enc_algos, sign_algos) log.info("Negotiated dialect: %s" % str(smb_response['dialect_revision'])) self.dialect = smb_response['dialect_revision'].get_value() @@ -767,15 +841,17 @@ class Connection(object): # SMB 3.1 if self.dialect >= Dialects.SMB_3_1_1: for context in smb_response['negotiate_context_list']: - if context['context_type'].get_value() == \ - NegotiateContextType.SMB2_ENCRYPTION_CAPABILITIES: - cipher_id = context['data']['ciphers'][0] - self.cipher_id = Ciphers.get_cipher(cipher_id) + context_type = context["context_type"].get_value() + + if context_type == NegotiateContextType.SMB2_ENCRYPTION_CAPABILITIES: + self.cipher_id = context['data']['ciphers'][0] self.supports_encryption = self.cipher_id != 0 - else: - hash_id = context['data']['hash_algorithms'][0] - self.preauth_integrity_hash_id = \ - HashAlgorithms.get_algorithm(hash_id) + + elif context_type == NegotiateContextType.SMB2_PREAUTH_INTEGRITY_CAPABILITIES: + self.preauth_integrity_hash_id = context['data']['hash_algorithms'][0] + + elif context_type == NegotiateContextType.SMB2_SIGNING_CAPABILITIES: + self.signing_algorithm_id = context['data']['signing_algorithms'][0] def disconnect(self, close=True): """ @@ -989,7 +1065,8 @@ class Connection(object): if session is None: raise SMBException("Failed to find session %s for message verification" % session_id) - expected = self._generate_signature(header.pack(), session.signing_key) + expected = self._generate_signature(header.pack(), session.signing_key, message_id, + flags.has_flag(Smb2Flags.SMB2_FLAGS_SERVER_TO_REDIR), command) actual = header['signature'].get_value() if actual != expected: raise SMBException("Server message signature could not be verified: %s != %s" @@ -1068,7 +1145,7 @@ class Connection(object): if force_signature or (session and session.signing_required and session.signing_key): header['flags'].set_flag(Smb2Flags.SMB2_FLAGS_SIGNED) b_header = header.pack() + padding - signature = self._generate_signature(b_header, session.signing_key) + signature = self._generate_signature(b_header, session.signing_key, current_id, False, message.COMMAND) # To save on unpacking and re-packing, manually adjust the signature and update the request object for # back-referencing. @@ -1211,13 +1288,37 @@ class Connection(object): for request in self.outstanding_requests.values(): request.response_event.set() - def _generate_signature(self, b_header, signing_key): + def _generate_signature(self, b_header, signing_key, message_id, response, command): b_header = b_header[:48] + (b"\x00" * 16) + b_header[64:] - if self.dialect >= Dialects.SMB_3_0_0: + if self.dialect >= Dialects.SMB_3_1_1 and self.signing_algorithm_id is not None: + sign_id = self.signing_algorithm_id + + elif self.dialect >= Dialects.SMB_3_0_0: + sign_id = SigningAlgorithms.AES_CMAC + + else: + sign_id = SigningAlgorithms.HMAC_SHA256 + + if sign_id == SigningAlgorithms.AES_GMAC: + message_info = 0 + if response: + message_info |= 1 + + if command == Commands.SMB2_CANCEL: + message_info |= 2 + + nonce = b"".join([ + message_id.to_bytes(8, byteorder="little"), + message_info.to_bytes(4, byteorder="little"), + ]) + signature = aead.AESGCM(signing_key).encrypt(nonce, b"", b_header) + + elif sign_id == SigningAlgorithms.AES_CMAC: c = cmac.CMAC(algorithms.AES(signing_key), backend=default_backend()) c.update(b_header) signature = c.finalize() + else: hmac_algo = hmac.new(signing_key, msg=b_header, digestmod=hashlib.sha256) signature = hmac_algo.digest()[:16] @@ -1231,13 +1332,16 @@ class Connection(object): encryption_key = session.encryption_key if self.dialect >= Dialects.SMB_3_1_1: - cipher = self.cipher_id + cipher_id = self.cipher_id else: - cipher = Ciphers.get_cipher(Ciphers.AES_128_CCM) - if cipher == aead.AESGCM: + cipher_id = Ciphers.AES_128_CCM + + if cipher_id in [Ciphers.AES_128_GCM, Ciphers.AES_256_GCM]: + cipher = aead.AESGCM nonce = os.urandom(12) header['nonce'] = nonce + (b"\x00" * 4) else: + cipher = aead.AESCCM nonce = os.urandom(11) header['nonce'] = nonce + (b"\x00" * 5) @@ -1263,13 +1367,18 @@ class Connection(object): raise SMBException(error_msg) if self.dialect >= Dialects.SMB_3_1_1: - cipher = self.cipher_id + cipher_id = self.cipher_id else: - cipher = Ciphers.get_cipher(Ciphers.AES_128_CCM) + cipher_id = Ciphers.AES_128_CCM - nonce_length = 12 if cipher == aead.AESGCM else 11 - nonce = message['nonce'].get_value()[:nonce_length] + if cipher_id in [Ciphers.AES_128_GCM, Ciphers.AES_256_GCM]: + cipher = aead.AESGCM + nonce_length = 12 + else: + cipher = aead.AESCCM + nonce_length = 11 + nonce = message['nonce'].get_value()[:nonce_length] signature = message['signature'].get_value() enc_message = message['data'].get_value() + signature @@ -1277,29 +1386,42 @@ class Connection(object): dec_message = c.decrypt(nonce, enc_message, message.pack()[20:52]) return dec_message - def _send_smb2_negotiate(self, dialect, timeout): + def _send_smb2_negotiate(self, dialect, timeout, encryption_algorithms, signing_algorithms): self.salt = os.urandom(32) if dialect is None: neg_req = SMB3NegotiateRequest() - self.negotiated_dialects = [ + negotiated_dialects = [ Dialects.SMB_2_0_2, Dialects.SMB_2_1_0, Dialects.SMB_3_0_0, Dialects.SMB_3_0_2, Dialects.SMB_3_1_1 ] - highest_dialect = Dialects.SMB_3_1_1 + + if SigningAlgorithms.HMAC_SHA256 not in signing_algorithms: + if Dialects.SMB_2_0_2 in negotiated_dialects: + negotiated_dialects.remove(Dialects.SMB_2_0_2) + if Dialects.SMB_2_1_0 in negotiated_dialects: + negotiated_dialects.remove(Dialects.SMB_2_1_0) + + if ( + SigningAlgorithms.AES_CMAC not in signing_algorithms or + Ciphers.AES_128_CCM not in encryption_algorithms + ): + if Dialects.SMB_3_0_0 in negotiated_dialects: + negotiated_dialects.remove(Dialects.SMB_3_0_0) + if Dialects.SMB_3_0_2 in negotiated_dialects: + negotiated_dialects.remove(Dialects.SMB_3_0_2) else: if dialect >= Dialects.SMB_3_1_1: neg_req = SMB3NegotiateRequest() else: neg_req = SMB2NegotiateRequest() - self.negotiated_dialects = [ - dialect - ] - highest_dialect = dialect - neg_req['dialects'] = self.negotiated_dialects + negotiated_dialects = [dialect] + + highest_dialect = sorted(negotiated_dialects)[-1] + self.negotiated_dialects = neg_req['dialects'] = negotiated_dialects log.info("Negotiating with SMB2 protocol with highest client dialect " "of: %s" % [dialect for dialect, v in vars(Dialects).items() if v == highest_dialect][0]) @@ -1340,17 +1462,32 @@ class Connection(object): enc_cap['context_type'] = \ NegotiateContextType.SMB2_ENCRYPTION_CAPABILITIES enc_cap['data'] = SMB2EncryptionCapabilities() - supported_ciphers = Ciphers.get_supported_ciphers() + supported_ciphers = encryption_algorithms enc_cap['data']['ciphers'] = supported_ciphers + log.debug("Adding encryption capabilities of AES128|256 GCM and " + "AES128|256 CCM to negotiate request") + + netname_id = SMB2NegotiateContextRequest() + netname_id['context_type'] = NegotiateContextType.SMB2_NETNAME_NEGOTIATE_CONTEXT_ID + netname_id['data'] = SMB2NetnameNegotiateContextId() + netname_id['data']['net_name'] = self.server_name + log.debug(f"Adding netname context id of {self.server_name} to negotiate request") + + signing_cap = SMB2NegotiateContextRequest() + signing_cap['context_type'] = NegotiateContextType.SMB2_SIGNING_CAPABILITIES + signing_cap['data'] = SMB2SigningCapabilities() + signing_cap['data']['signing_algorithms'] = signing_algorithms + log.debug("Adding signing algorithms AES_GMAC, AES_CMAC, and HMAC_SHA256 to negotiate request") + # remove extra padding for last list entry - enc_cap['padding'].size = 0 - enc_cap['padding'] = b"" - log.debug("Adding encryption capabilities of AES128 GCM and " - "AES128 CCM to negotiate request") + signing_cap['padding'].size = 0 + signing_cap['padding'] = b"" neg_req['negotiate_context_list'] = [ int_cap, - enc_cap + enc_cap, + netname_id, + signing_cap, ] log.info("Sending SMB2 Negotiate message") diff --git a/smbprotocol/session.py b/smbprotocol/session.py index cba9cc6b69eabaa48b710a8be72122df38aafe74..d6d33bcc902d22e3a3e11c8498643ec38691f638 100644 --- a/smbprotocol/session.py +++ b/smbprotocol/session.py @@ -2,6 +2,7 @@ # Copyright: (c) 2019, Jordan Borean (@jborean93) <jborean93@gmail.com> # MIT License (see LICENSE or https://opensource.org/licenses/MIT) +import hashlib import logging import random import spnego @@ -30,6 +31,7 @@ from smbprotocol import ( from smbprotocol.connection import ( Capabilities, + Ciphers, SecurityMode, ) @@ -251,6 +253,7 @@ class Session(object): self.decryption_key = None self.signing_key = None self.application_key = None + self.full_session_key = None # SMB 3.1.1+ # Preauth integrity value computed for the exhange of SMB2 @@ -321,19 +324,31 @@ class Session(object): self.connection.session_table[self.session_id] = self.connection.preauth_session_table.pop(self.session_id) # session_key is the first 16 bytes, padded 0 if less than 16 - self.session_key = context.session_key[:16].ljust(16, b"\x00") + self.full_session_key = context.session_key + self.session_key = self.full_session_key[:16].ljust(16, b"\x00") if self.connection.dialect >= Dialects.SMB_3_1_1: preauth_hash = b"\x00" * 64 - hash_al = self.connection.preauth_integrity_hash_id for hash_list in [self.connection.preauth_integrity_hash_value, self.preauth_integrity_hash_value]: for message in hash_list: - preauth_hash = hash_al(preauth_hash + message).digest() + # Technically the algo is based on preauth_integrity_hash_id but we only support the 1 + preauth_hash = hashlib.sha512(preauth_hash + message).digest() self.signing_key = self._smb3kdf(self.session_key, b"SMBSigningKey\x00", preauth_hash) self.application_key = self._smb3kdf(self.session_key, b"SMBAppKey\x00", preauth_hash) - self.encryption_key = self._smb3kdf(self.session_key, b"SMBC2SCipherKey\x00", preauth_hash) - self.decryption_key = self._smb3kdf(self.session_key, b"SMBS2CCipherKey\x00", preauth_hash) + + if self.connection.cipher_id in [ + Ciphers.AES_256_CCM, + Ciphers.AES_256_GCM, + ]: + key_length = 32 + key = self.full_session_key + else: + key_length = 16 + key = self.session_key + + self.encryption_key = self._smb3kdf(key, b"SMBC2SCipherKey\x00", preauth_hash, length=key_length) + self.decryption_key = self._smb3kdf(key, b"SMBS2CCipherKey\x00", preauth_hash, length=key_length) elif self.connection.dialect >= Dialects.SMB_3_0_0: self.signing_key = self._smb3kdf(self.session_key, b"SMB2AESCMAC\x00", b"SmbSign\x00") @@ -405,7 +420,7 @@ class Session(object): self._connected = False del self.connection.session_table[self.session_id] - def _smb3kdf(self, ki, label, context): + def _smb3kdf(self, ki, label, context, length=16): """ See SMB 3.x key derivation function https://blogs.msdn.microsoft.com/openspecification/2017/05/26/smb-2-and-smb-3-security-in-windows-10-the-anatomy-of-signing-and-cryptographic-keys/ @@ -413,13 +428,14 @@ class Session(object): :param ki: The session key is the KDK used as an input to the KDF :param label: The purpose of this derived key as bytes string :param context: The context information of this derived key as bytes + :param length: The length of the key to generate string :return: Key derived by the KDF as specified by [SP800-108] 5.1 """ kdf = KBKDFHMAC( algorithm=hashes.SHA256(), mode=Mode.CounterMode, - length=16, + length=length, rlen=4, llen=4, location=CounterLocation.BeforeFixed, diff --git a/tests/test_connection.py b/tests/test_connection.py index 9dea483ce47cb721cae525654908dee68caf9ae4..76f3fabe5c21bc3b04ee81f832ae2069a81200dd 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -2,15 +2,10 @@ # Copyright: (c) 2019, Jordan Borean (@jborean93) <jborean93@gmail.com> # MIT License (see LICENSE or https://opensource.org/licenses/MIT) -import hashlib import os import pytest import uuid -from cryptography.hazmat.primitives.ciphers import ( - aead, -) - from datetime import ( datetime, ) @@ -25,6 +20,8 @@ from smbprotocol.connection import ( HashAlgorithms, NegotiateContextType, Request, + SMB2NetnameNegotiateContextId, + SMB2SigningCapabilities, SecurityMode, SMB2CancelRequest, SMB2EncryptionCapabilities, @@ -35,6 +32,7 @@ from smbprotocol.connection import ( SMB2PreauthIntegrityCapabilities, SMB2TransformHeader, SMB3NegotiateRequest, + SigningAlgorithms, ) from smbprotocol.header import ( @@ -56,30 +54,6 @@ from smbprotocol.session import ( ) -def test_valid_hash_algorithm(): - expected = hashlib.sha512 - actual = HashAlgorithms.get_algorithm(0x1) - assert actual == expected - - -def test_invalid_hash_algorithm(): - with pytest.raises(KeyError) as exc: - HashAlgorithms.get_algorithm(0x2) - assert False # shouldn't be reached - - -def test_valid_cipher(): - expected = aead.AESCCM - actual = Ciphers.get_cipher(0x1) - assert actual == expected - - -def test_invalid_cipher(): - with pytest.raises(KeyError) as exc: - Ciphers.get_cipher(0x3) - assert False # shouldn't be reached - - class TestSMB2NegotiateRequest(object): def test_create_message(self): @@ -163,8 +137,15 @@ class TestSMB3NegotiateRequest(object): enc_cap = SMB2EncryptionCapabilities() enc_cap['ciphers'] = [Ciphers.AES_128_GCM] con_req['data'] = enc_cap + + netname = SMB2NegotiateContextRequest() + netname['context_type'] = NegotiateContextType.SMB2_NETNAME_NEGOTIATE_CONTEXT_ID + netname['data'] = SMB2NetnameNegotiateContextId() + netname['data']['net_name'] = 'café' + message['negotiate_context_list'] = [ - con_req + con_req, + netname, ] expected = b"\x24\x00" \ b"\x05\x00" \ @@ -174,7 +155,7 @@ class TestSMB3NegotiateRequest(object): b"\x33\x33\x33\x33\x33\x33\x33\x33" \ b"\x33\x33\x33\x33\x33\x33\x33\x33" \ b"\x70\x00\x00\x00" \ - b"\x01\x00" \ + b"\x02\x00" \ b"\x00\x00" \ b"\x02\x02" \ b"\x10\x02" \ @@ -184,9 +165,12 @@ class TestSMB3NegotiateRequest(object): b"\x00\x00" \ b"\x02\x00\x04\x00\x00\x00\x00\x00" \ b"\x01\x00\x02\x00" \ - b"\x00\x00\x00\x00" + b"\x00\x00\x00\x00" \ + b"\x05\x00\x08\x00\x00\x00\x00\x00" \ + b"\x63\x00\x61\x00\x66\x00\xe9\x00" + actual = message.pack() - assert len(message) == 64 + assert len(message) == 80 assert actual == expected def test_create_message_one_dialect(self): @@ -236,7 +220,7 @@ class TestSMB3NegotiateRequest(object): b"\x33\x33\x33\x33\x33\x33\x33\x33" \ b"\x33\x33\x33\x33\x33\x33\x33\x33" \ b"\x70\x00\x00\x00" \ - b"\x01\x00" \ + b"\x02\x00" \ b"\x00\x00" \ b"\x02\x02" \ b"\x10\x02" \ @@ -246,9 +230,12 @@ class TestSMB3NegotiateRequest(object): b"\x00\x00" \ b"\x02\x00\x04\x00\x00\x00\x00\x00" \ b"\x01\x00\x02\x00" \ - b"\x00\x00\x00\x00" + b"\x00\x00\x00\x00" \ + b"\x05\x00\x08\x00\x00\x00\x00\x00" \ + b"\x63\x00\x61\x00\x66\x00\xe9\x00" + actual.unpack(data) - assert len(actual) == 60 + assert len(actual) == 76 assert actual['structure_size'].get_value() == 36 assert actual['dialect_count'].get_value() == 5 assert actual['security_mode'].get_value() == \ @@ -258,7 +245,7 @@ class TestSMB3NegotiateRequest(object): assert actual['client_guid'].get_value() == \ uuid.UUID(bytes=b"\x33" * 16) assert actual['negotiate_context_offset'].get_value() == 112 - assert actual['negotiate_context_count'].get_value() == 1 + assert actual['negotiate_context_count'].get_value() == 2 assert actual['reserved2'].get_value() == 0 assert actual['dialects'].get_value() == [ Dialects.SMB_2_0_2, @@ -269,7 +256,7 @@ class TestSMB3NegotiateRequest(object): ] assert actual['padding'].get_value() == b"\x00\x00" - assert len(actual['negotiate_context_list'].get_value()) == 1 + assert len(actual['negotiate_context_list'].get_value()) == 2 neg_con = actual['negotiate_context_list'][0] assert isinstance(neg_con, SMB2NegotiateContextRequest) assert len(neg_con) == 12 @@ -282,6 +269,15 @@ class TestSMB3NegotiateRequest(object): assert neg_con['data']['cipher_count'].get_value() == 1 assert neg_con['data']['ciphers'].get_value() == [Ciphers.AES_128_GCM] + net_name = actual['negotiate_context_list'][1] + assert isinstance(net_name, SMB2NegotiateContextRequest) + assert len(net_name) == 16 + assert net_name['context_type'].get_value() == NegotiateContextType.SMB2_NETNAME_NEGOTIATE_CONTEXT_ID + assert net_name['data_length'].get_value() == 8 + assert net_name['reserved'].get_value() == 0 + assert isinstance(net_name['data'].get_value(), SMB2NetnameNegotiateContextId) + assert net_name['data']['net_name'].get_value() == 'café' + class TestSMB2NegotiateContextRequest(object): @@ -323,14 +319,14 @@ class TestSMB2NegotiateContextRequest(object): def test_parse_message_invalid_context_type(self): actual = SMB2NegotiateContextRequest() - data = b"\x03\x00" \ + data = b"\xFF\xFF" \ b"\x04\x00" \ b"\x00\x00\x00\x00" \ b"\x01\x00" \ b"\x02\x00" with pytest.raises(Exception) as exc: actual.unpack(data) - assert str(exc.value) == "Enum value 3 does not exist in enum type " \ + assert str(exc.value) == "Enum value 65535 does not exist in enum type " \ "<class 'smbprotocol.connection." \ "NegotiateContextType'>" @@ -398,6 +394,58 @@ class TestSMB2EncryptionCapabilities(object): ] +class TestSMB2NetnameNegotiateContextId: + + def test_create_message(self): + message = SMB2NetnameNegotiateContextId() + message["net_name"] = "hostname" + expected = b"\x68\x00\x6F\x00\x73\x00\x74\x00" \ + b"\x6E\x00\x61\x00\x6D\x00\x65\x00" + actual = message.pack() + assert len(message) == 16 + assert actual == expected + + def test_parse_message(self): + actual = SMB2NetnameNegotiateContextId() + data = b"\x68\x00\x6F\x00\x73\x00\x74\x00" \ + b"\x6E\x00\x61\x00\x6D\x00\x65\x00" + actual.unpack(data) + assert len(actual) == 16 + assert actual["net_name"].get_value() == "hostname" + + +class TestSMB2SigningCapabilities: + + def test_create_message(self): + message = SMB2SigningCapabilities() + message["signing_algorithms"] = [ + SigningAlgorithms.AES_GMAC, + SigningAlgorithms.AES_CMAC, + SigningAlgorithms.HMAC_SHA256, + ] + expected = b"\x03\x00" \ + b"\x02\x00" \ + b"\x01\x00" \ + b"\x00\x00" + actual = message.pack() + assert len(message) == 8 + assert actual == expected + + def test_parse_message(self): + actual = SMB2SigningCapabilities() + data = b"\x03\x00" \ + b"\x02\x00" \ + b"\x01\x00" \ + b"\x00\x00" + actual.unpack(data) + assert len(actual) == 8 + assert actual["signing_algorithms"].get_value() == [ + SigningAlgorithms.AES_GMAC, + SigningAlgorithms.AES_CMAC, + SigningAlgorithms.HMAC_SHA256, + ] + + class TestSMB2NegotiateResponse(object): def test_create_message(self): @@ -1079,51 +1127,61 @@ class TestConnection(object): finally: connection.disconnect(True) - def test_encrypt_ccm(self, monkeypatch): + @pytest.mark.parametrize('key, cipher, signature, data', [ + (b"\xff" * 16, Ciphers.AES_128_CCM, b"\xc8\x73\x0c\x9b\xa7\xe5\x9f\x1c\xfd\x37\x51\xa1\x95\xf2\xb3\xac", + b"\x21\x91\xe3\x0e"), + (b"\xff" * 32, Ciphers.AES_256_CCM, b"\x3E\xFB\x47\x97\x51\x8A\xAB\x05\xC5\x48\xA7\xFC\x20\x74\xF5\x93", + b"\x2F\x58\x41\xD7"), + ], ids=['AES128_CCM', 'AES256_CCM']) + def test_encrypt_ccm(self, key, cipher, signature, data, monkeypatch): def mockurandom(length): return b"\xff" * length monkeypatch.setattr(os, 'urandom', mockurandom) connection = Connection(uuid.uuid4(), "server", 445) connection.dialect = Dialects.SMB_3_1_1 - connection.cipher_id = Ciphers.get_cipher(Ciphers.AES_128_CCM) + connection.cipher_id = cipher session = Session(connection, "user", "pass") session.session_id = 1 - session.encryption_key = b"\xff" * 16 + session.encryption_key = key expected = SMB2TransformHeader() - expected['signature'] = b"\xc8\x73\x0c\x9b\xa7\xe5\x9f\x1c" \ - b"\xfd\x37\x51\xa1\x95\xf2\xb3\xac" + expected['signature'] = signature expected['nonce'] = b"\xff" * 11 + b"\x00" * 5 expected['original_message_size'] = 4 expected['flags'] = 1 expected['session_id'] = 1 - expected['data'] = b"\x21\x91\xe3\x0e" + expected['data'] = data actual = connection._encrypt(b"\x01\x02\x03\x04", session) assert isinstance(actual, SMB2TransformHeader) assert actual.pack() == expected.pack() - def test_encrypt_gcm(self, monkeypatch): + @pytest.mark.parametrize('key, cipher, signature, data', [ + (b"\xff" * 16, Ciphers.AES_128_GCM, b"\x39\xd8\x32\x34\xd7\x53\xd0\x8e\xc0\xfc\xbe\x33\x01\x5f\x19\xbd", + b"\xda\x26\x57\x33"), + (b"\xff" * 32, Ciphers.AES_256_GCM, b"\x45\xE5\xB7\x23\x05\x2E\xCA\xD0\x1E\xEF\xAD\x6F\x04\x87\xE3\x2D", + b"\xBC\x39\xBD\x81"), + ], ids=['AES128_CCM', 'AES256_CCM']) + def test_encrypt_gcm(self, key, cipher, signature, data, monkeypatch): def mockurandom(length): return b"\xff" * length monkeypatch.setattr(os, 'urandom', mockurandom) connection = Connection(uuid.uuid4(), "server", 445) connection.dialect = Dialects.SMB_3_1_1 - connection.cipher_id = Ciphers.get_cipher(Ciphers.AES_128_GCM) + connection.cipher_id = cipher session = Session(connection, "user", "pass") session.session_id = 1 - session.encryption_key = b"\xff" * 16 + session.encryption_key = key expected = SMB2TransformHeader() - expected['signature'] = b"\x39\xd8\x32\x34\xd7\x53\xd0\x8e" \ - b"\xc0\xfc\xbe\x33\x01\x5f\x19\xbd" + expected['signature'] = signature expected['nonce'] = b"\xff" * 12 + b"\x00" * 4 expected['original_message_size'] = 4 expected['flags'] = 1 expected['session_id'] = 1 - expected['data'] = b"\xda\x26\x57\x33" + expected['data'] = data actual = connection._encrypt(b"\x01\x02\x03\x04", session) assert isinstance(actual, SMB2TransformHeader) diff --git a/tests/test_smbclient_os.py b/tests/test_smbclient_os.py index 18f6af8096d7c2f58bce3c61ef5d7e4d71323d08..9b4eb57dd4f9017803cfac55fb9a23d9dc622204 100644 --- a/tests/test_smbclient_os.py +++ b/tests/test_smbclient_os.py @@ -826,8 +826,6 @@ def test_open_file_unbuffered_text_file(smb_share): smbclient.open_file("%s\\file.txt" % smb_share, mode='w', buffering=0) -@pytest.mark.skipif(os.name != "nt" and not os.environ.get('SMB_FORCE', False), - reason="Bug on latest Samba https://bugzilla.samba.org/show_bug.cgi?id=14877") def test_open_file_with_ads(smb_share): filename = "%s\\file.txt" % smb_share with smbclient.open_file(filename, mode='w') as fd: diff --git a/tests/test_tree.py b/tests/test_tree.py index 6ec25aa85a3061f5d560ef4128df1c31adde0686..948ebce90c2257fedf9de0123628d1361b5b0929 100644 --- a/tests/test_tree.py +++ b/tests/test_tree.py @@ -10,7 +10,9 @@ from smbprotocol import ( ) from smbprotocol.connection import ( + Ciphers, Connection, + SigningAlgorithms, ) from smbprotocol.exceptions import ( @@ -266,3 +268,50 @@ class TestTreeConnect(object): finally: connection.disconnect(True) tree.disconnect() # test that disconnect can be run mutliple times + + @pytest.mark.parametrize('cipher', [ + Ciphers.AES_128_CCM, + Ciphers.AES_128_GCM, + Ciphers.AES_256_CCM, + Ciphers.AES_256_GCM, + ], ids=['AES_128_CCM', 'AES_128_GCM', 'AES_256_CCM', 'AES_256_GCM']) + def test_encryption(self, cipher, smb_real): + connection = Connection(uuid.uuid4(), smb_real[2], smb_real[3]) + connection.connect(preferred_encryption_algos=[cipher]) + + try: + if connection.cipher_id == 0: + pytest.skip("Server did not support encryption requested") + + assert connection.cipher_id == cipher + + session = Session(connection, smb_real[0], smb_real[1]) + tree = TreeConnect(session, smb_real[4]) + session.connect() + tree.connect() + + finally: + connection.disconnect(True) + + @pytest.mark.parametrize('algo', [ + SigningAlgorithms.AES_GMAC, + SigningAlgorithms.AES_CMAC, + SigningAlgorithms.HMAC_SHA256, + ], ids=['AES_GMAC', 'AES_CMAC', 'HMAC_SHA256']) + def test_signing(self, algo, smb_real): + connection = Connection(uuid.uuid4(), smb_real[2], smb_real[3]) + connection.connect(preferred_signing_algos=[algo]) + + try: + if connection.signing_algorithm_id is None: + pytest.skip("Server did not support signing algo requested") + + assert connection.signing_algorithm_id == algo + + session = Session(connection, smb_real[0], smb_real[1], require_encryption=False) + tree = TreeConnect(session, smb_real[4]) + session.connect() + tree.connect() + + finally: + connection.disconnect(True)