diff --git a/librespot/crypto/CipherPair.py b/librespot/crypto/CipherPair.py index 8725101..878f273 100644 --- a/librespot/crypto/CipherPair.py +++ b/librespot/crypto/CipherPair.py @@ -10,14 +10,12 @@ class CipherPair: receive_nonce = 0 def __init__(self, send_key: bytes, receive_key: bytes): - # self.send_cipher = Shannon() - # self.send_cipher.key(send_key) - self.send_cipher = Shannon(send_key) + self.send_cipher = Shannon() + self.send_cipher.key(send_key) self.send_nonce = 0 - # self.receive_cipher = Shannon() - # self.receive_cipher.key(receive_key) - self.receive_cipher = Shannon(receive_key) + self.receive_cipher = Shannon() + self.receive_cipher.key(receive_key) self.receive_nonce = 0 def send_encoded(self, conn, cmd: bytes, payload: bytes): @@ -31,7 +29,6 @@ class CipherPair: buffer = self.send_cipher.encrypt(buffer) - # mac = self.send_cipher.finish(bytes(4)) mac = self.send_cipher.finish(4) conn.write(buffer) diff --git a/librespot/crypto/Shannon.py b/librespot/crypto/Shannon.py index 974bd52..1e8ad5b 100644 --- a/librespot/crypto/Shannon.py +++ b/librespot/crypto/Shannon.py @@ -1,495 +1,322 @@ -""" - Shannon: Shannon stream cipher and MAC -- reference implementation, ported from C code written by Greg Rose - https://github.com/sashahilton00/spotify-connect-resources/blob/master/Shannon-1.0/ShannonRef.c - - Copyright 2017, Dmitry Borisov - - THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED - WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE AND AGAINST - INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR - CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -""" - -import struct, \ - copy - -# Constants -N = 16 -INITKONST = 0x6996c53a -KEYP = 13 # where to insert key/MAC/counter words -FOLD = N # how many iterations of folding to do +import struct +import typing class Shannon: - @staticmethod - def ROTL(w, x): - return ((w << x) | (w >> (32 - x))) & 0xFFFFFFFF + N = 16 + FOLD = N + INITKONST = 0x6996c53a + KEYP = 13 - @staticmethod - def ROTR(w, x): - return ((w >> x) | (w << (32 - x))) & 0xFFFFFFFF + R: list + CRC: list + initR: list + konst: int + sbuf: int + mbuf: int + nbuf: int - """ Nonlinear transform (sbox) of a word. - There are two slightly different combinations. """ + def __init__(self): + self.R = [0 for _ in range(self.N)] + self.CRC = [0 for _ in range(self.N)] + self.initR = [0 for _ in range(self.N)] - @staticmethod - def sbox1(w): - w ^= Shannon.ROTL(w, 5) | Shannon.ROTL(w, 7) - w ^= Shannon.ROTL(w, 19) | Shannon.ROTL(w, 22) - return w + def rotl(self, i: int, distance: int): + return ((i << distance) | (i >> (32 - distance))) & 0xffffffff - """ Nonlinear transform (sbox) of a word. - There are two slightly different combinations. """ + def sbox(self, i: int): + i ^= self.rotl(i, 5) | self.rotl(i, 7) + i ^= self.rotl(i, 19) | self.rotl(i, 22) - @staticmethod - def sbox2(w): - w ^= Shannon.ROTL(w, 7) | Shannon.ROTL(w, 22) - w ^= Shannon.ROTL(w, 5) | Shannon.ROTL(w, 19) - return w + return i - """ initialise to known state """ + def sbox2(self, i: int): + i ^= self.rotl(i, 7) | self.rotl(i, 22) + i ^= self.rotl(i, 5) | self.rotl(i, 19) - def _initstate(self): - global N, \ - INITKONST + return i - # Generate fibonacci numbers up to N - self._R = [1, 1] - for x in range(1, N - 1): - self._R.append(self._R[x] + self._R[x - 1]) + def cycle(self): + t: int - self._konst = INITKONST + t = self.R[12] ^ self.R[13] ^ self.konst + t = self.sbox(t) ^ self.rotl(self.R[0], 1) - """ cycle the contents of the register and calculate output word in _sbuf. """ + for i in range(1, self.N): + self.R[i - 1] = self.R[i] - def _cycle(self): - # nonlinear feedback function - t = self._R[12] ^ self._R[13] ^ self._konst - t = Shannon.sbox1(t) ^ Shannon.ROTL(self._R[0], 1) + self.R[self.N - 1] = t - # Shift to the left - self._R = self._R[1:] + [t] - t = Shannon.sbox2(self._R[2] ^ self._R[15]) - self._R[0] ^= t - self._sbuf = t ^ self._R[8] ^ self._R[12] + t = self.sbox2(self.R[2] ^ self.R[15]) + self.R[0] ^= t + self.sbuf = t ^ self.R[8] ^ self.R[12] - """ The Shannon MAC function is modelled after the concepts of Phelix and SHA. - Basically, words to be accumulated in the MAC are incorporated in two - different ways: - 1. They are incorporated into the stream cipher register at a place - where they will immediately have a nonlinear effect on the state - 2. They are incorporated into bit-parallel CRC-16 registers; the - contents of these registers will be used in MAC finalization. """ - """ Accumulate a CRC of input words, later to be fed into MAC. - This is actually 32 parallel CRC-16s, using the IBM CRC-16 - polynomial x^16 + x^15 + x^2 + 1. """ + def crc_func(self, i: int): + t: int - def _crcfunc(self, i): - t = self._CRC[0] ^ self._CRC[2] ^ self._CRC[15] ^ i - # Accumulate CRC of input - self._CRC = self._CRC[1:] + [t] + t = self.CRC[0] ^ self.CRC[2] ^ self.CRC[15] ^ i - """ Normal MAC word processing: do both stream register and CRC. """ + for j in range(1, self.N): + self.CRC[j - 1] = self.CRC[j] - def _macfunc(self, i): - global KEYP + self.CRC[self.N - 1] = t - self._crcfunc(i) - self._R[KEYP] ^= i + def mac_func(self, i: int): + self.crc_func(i) - """ extra nonlinear diffusion of register for key and MAC """ + self.R[self.KEYP] ^= i - def _diffuse(self): - global FOLD + def init_state(self): + self.R[0] = 1 + self.R[1] = 1 - for i in range(FOLD): - self._cycle() + for i in range(2, self.N): + self.R[i] = self.R[i - 1] + self.R[i - 2] - """ Common actions for loading key material - Allow non-word-multiple key and nonce material. - Note also initializes the CRC register as a side effect. """ + self.konst = self.INITKONST - def _loadkey(self, key): - global KEYP, \ - N + def save_state(self): + for i in range(self.N): + self.initR[i] = self.R[i] + + def reload_state(self): + for i in range(self.N): + self.R[i] = self.initR[i] + + def gen_konst(self): + self.konst = self.R[0] + + def add_key(self, k: int): + self.R[self.KEYP] ^= k + + def diffuse(self): + for i in range(self.FOLD): + self.cycle() + + def load_key(self, key: bytes): + extra = bytearray(4) + i: int + j: int + t: int - # Pad key with 00s to align on 4 bytes and add key_len padding_size = int((len(key) + 3) / 4) * 4 - len(key) - key = key + (b'\x00' * padding_size) + struct.pack("I", nonce)) - self._R = copy.copy(self._initR) - self._konst = INITKONST - self._loadkey(nonce) - self._konst = self._R[0] - self._nbuf = 0 - self._mbuf = 0 + self.reload_state() - """ Encrypt small chunk """ + self.konst = self.INITKONST - def _encrypt_chunk(self, chunk): - result = [] - for c in chunk: - self._mbuf ^= c << (32 - self._nbuf) - result.append(c ^ (self._sbuf >> (32 - self._nbuf)) & 0xFF) - self._nbuf -= 8 + self.load_key(nonce) - return result + self.gen_konst() - """ Combined MAC and encryption. - Note that plaintext is accumulated for MAC. """ + self.nbuf = 0 - def encrypt(self, buf): - # handle any previously buffered bytes - result = [] - if self._nbuf != 0: - head = buf[:(self._nbuf >> 3)] - buf = buf[(self._nbuf >> 3):] - result = self._encrypt_chunk(head) - if self._nbuf != 0: - return bytes(result) + def encrypt(self, buffer: bytes, n: int = None): + if n is None: + return self.encrypt(buffer, len(buffer)) - # LFSR already cycled - self._macfunc(self._mbuf) + buffer = bytearray(buffer) - # Handle body i = 0 - while len(buf) >= 4: - self._cycle() - t = struct.unpack("> (32 - self.nbuf)) & 0xff - return bytes(result) + i += 1 - """ Decrypt small chunk """ + self.nbuf -= 8 - def _decrypt_chunk(self, chunk): - result = [] - for c in chunk: - result.append(c ^ ((self._sbuf >> (32 - self._nbuf)) & 0xFF)) - self._mbuf ^= result[-1] << (32 - self._nbuf) - self._nbuf -= 8 + n -= 1 - return result + if self.nbuf != 0: + return - """ Combined MAC and decryption. - Note that plaintext is accumulated for MAC. """ + self.mac_func(self.mbuf) - def decrypt(self, buf): - # handle any previously buffered bytes - result = [] - if self._nbuf != 0: - head = buf[:(self._nbuf >> 3)] - buf = buf[(self._nbuf >> 3):] - result = self._decrypt_chunk(head) - if self._nbuf != 0: - return bytes(result) + j = n & ~0x03 - # LFSR already cycled - self._macfunc(self._mbuf) + while i < j: + self.cycle() + + t = ((buffer[i + 3] & 0xFF) << 24) | \ + ((buffer[i + 2] & 0xFF) << 16) | \ + ((buffer[i + 1] & 0xFF) << 8) | \ + (buffer[i] & 0xFF) + + self.mac_func(t) + + t ^= self.sbuf + + buffer[i + 3] = (t >> 24) & 0xFF + buffer[i + 2] = (t >> 16) & 0xFF + buffer[i + 1] = (t >> 8) & 0xFF + buffer[i] = t & 0xFF + + i += 4 + + n &= 0x03 + + if n != 0: + self.cycle() + + self.mbuf = 0 + self.nbuf = 32 + + while self.nbuf != 0 and n != 0: + self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf) + buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff + + i += 1 + + self.nbuf -= 8 + + n -= 1 + return bytes(buffer) + + def decrypt(self, buffer: bytes, n: int = None): + if n is None: + return self.decrypt(buffer, len(buffer)) + + buffer = bytearray(buffer) - # Handle whole words i = 0 - while len(buf) >= 4: - self._cycle() - t = struct.unpack("> (32 - self.nbuf)) & 0xff + self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf) - return bytes(result) + i += 1 - """ Having accumulated a MAC, finish processing and return it. - Note that any unprocessed bytes are treated as if - they were encrypted zero bytes, so plaintext (zero) is accumulated. """ + self.nbuf -= 8 - def finish(self, buf_len): - global KEYP, \ - INITKONST + n -= 1 - # handle any previously buffered bytes - if self._nbuf != 0: - # LFSR already cycled - self._macfunc(self._mbuf) + if self.nbuf != 0: + return - # perturb the MAC to mark end of input. - # Note that only the stream register is updated, not the CRC. This is an - # action that can't be duplicated by passing in plaintext, hence - # defeating any kind of extension attack. - self._cycle() - self._R[KEYP] ^= INITKONST ^ (self._nbuf << 3) - self._nbuf = 0 + self.mac_func(self.mbuf) - # now add the CRC to the stream register and diffuse it - for i in range(N): - self._R[i] ^= self._CRC[i] + j = n & ~0x03 - self._diffuse() + while i < j: + self.cycle() + + t = ((buffer[i + 3] & 0xFF) << 24) | \ + ((buffer[i + 2] & 0xFF) << 16) | \ + ((buffer[i + 1] & 0xFF) << 8) | \ + (buffer[i] & 0xFF) + + t ^= self.sbuf + + self.mac_func(t) + + buffer[i + 3] = (t >> 24) & 0xFF + buffer[i + 2] = (t >> 16) & 0xFF + buffer[i + 1] = (t >> 8) & 0xFF + buffer[i] = t & 0xFF + + i += 4 + + n &= 0x03 + + if n != 0: + self.cycle() + + self.mbuf = 0 + self.nbuf = 32 + + while self.nbuf != 0 and n != 0: + buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff + self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf) + + i += 1 + + self.nbuf -= 8 + + n -= 1 + + return bytes(buffer) + + def finish(self, n: int): + buffer = bytearray(4) - result = [] - # produce output from the stream buffer i = 0 - for i in range(0, buf_len, 4): - self._cycle() - if i + 4 <= buf_len: - result += struct.pack(" 0: + self.cycle() + + if n >= 4: + buffer[i + 3] = (self.sbuf >> 24) & 0xff + buffer[i + 2] = (self.sbuf >> 16) & 0xff + buffer[i + 1] = (self.sbuf >> 8) & 0xff + buffer[i] = self.sbuf & 0xff + + n -= 4 + i += 4 else: - sbuf = self._sbuf - for j in range(i, buf_len): - result.append(sbuf & 0xFF) - sbuf >>= 8 - - return bytes(result) + for j in range(n): + buffer[i + j] = (self.sbuf >> (i * 8)) & 0xff + break + return bytes(buffer) -if __name__ == '__main__': - TESTSIZE = 23 +if __name__ == "__main__": TEST_KEY = b"test key 128bits" TEST_PHRASE = b'\x00' * 20 - - sh = Shannon( - bytes([ - 133, 199, 15, 101, 207, 100, 229, 237, 15, 249, 248, 155, 76, 170, - 62, 189, 239, 251, 147, 213, 22, 186, 157, 47, 218, 198, 235, 14, - 171, 50, 11, 121 - ])) - sh.set_nonce(0) - p1 = sh.decrypt( - bytes([ - 235, - 94, - 210, - 19, - 246, - 203, - 195, - 35, - 22, - 215, - 80, - 69, - 158, - 247, - 110, - 146, - 241, - 101, - 199, - 37, - 67, - 92, - 5, - 197, - 112, - 244, - 77, - 185, - 197, - 118, - 119, - 56, - 164, - 246, - 159, - 242, - 56, - 200, - 39, - 27, - 141, - 191, - 37, - 244, - 244, - 164, - 44, - 250, - 59, - 227, - 245, - 155, - 239, - 155, - 137, - 85, - 244, - 29, - 52, - 233, - 180, - 119, - 166, - 46, - 252, - 24, - 141, - 20, - 135, - 73, - 144, - 10, - 176, - 79, - 88, - 228, - 140, - 62, - 173, - 192, - 117, - 116, - 152, - 182, - 246, - 183, - 88, - 90, - 73, - 51, - 159, - 83, - 227, - 222, - 140, - 48, - 157, - 137, - 185, - 131, - 201, - 202, - 122, - 112, - 207, - 231, - 153, - 155, - 9, - 163, - 225, - 73, - 41, - 252, - 249, - 65, - 33, - 102, - 83, - 100, - 36, - 115, - 174, - 191, - 43, - 250, - 113, - 229, - 146, - 47, - 154, - 175, - 55, - 101, - 73, - 164, - 49, - 234, - 103, - 32, - 53, - 190, - 236, - 47, - 210, - 78, - 141, - 0, - 176, - 255, - 79, - 151, - 159, - 66, - 20, - ])) - print([hex(x) for x in p1]) - print([hex(x) for x in sh.finish(4)]) - sh.set_nonce(1) - print([hex(x) for x in sh.decrypt(bytes([173, 184, 50]))]) - - sh = Shannon(TEST_KEY) - sh.set_nonce(0) - encr = [sh.encrypt(bytes([x])) for x in TEST_PHRASE] - print('Encrypted 1-by-1 (len %d)' % len(encr), [hex(x[0]) for x in encr]) - print(' sbuf %08x' % sh._sbuf) - print(' MAC', [hex(x) for x in sh.finish(4)]) - - sh.set_nonce(0) + sh = Shannon() + sh.key(TEST_KEY) + sh.nonce(0) encr = sh.encrypt(TEST_PHRASE) - print('Encrypted whole (len %d)' % len(encr), [hex(x) for x in encr]) - print(' sbuf %08x' % sh._sbuf) - print(' MAC', [hex(x) for x in sh.finish(4)]) - - sh.set_nonce(0) - print('Decrypted whole', [hex(x) for x in sh.decrypt(encr)]) - print(' MAC', [hex(x) for x in sh.finish(4)]) - - sh.set_nonce(0) - decr = [sh.decrypt(bytes([x])) for x in encr] - print('Decrypted 1-by-1', [hex(x[0]) for x in decr]) - print(' MAC', [hex(x) for x in sh.finish(4)]) + print(encr) diff --git a/librespot/crypto/Shannon_DEV.py b/librespot/crypto/Shannon_DEV.py deleted file mode 100644 index c8a80cc..0000000 --- a/librespot/crypto/Shannon_DEV.py +++ /dev/null @@ -1,311 +0,0 @@ -import struct - - -class Shannon: - n = 16 - fold = n - initkonst = 0x6996c53a - keyp = 13 - - r: list - crc: list - initr: list - konst: int - sbuf: int - mbuf: int - nbuf: int - - def __init__(self): - self.r = [0 for _ in range(self.n)] - self.crc = [0 for _ in range(self.n)] - self.initr = [0 for _ in range(self.n)] - - def rotl(self, i: int, distance: int): - return ((i << distance) | (i >> (32 - distance))) & 0xffffffff - - def sbox(self, i: int): - i ^= self.rotl(i, 5) | self.rotl(i, 7) - i ^= self.rotl(i, 19) | self.rotl(i, 22) - - return i - - def sbox2(self, i: int): - i ^= self.rotl(i, 7) | self.rotl(i, 22) - i ^= self.rotl(i, 5) | self.rotl(i, 19) - - return i - - def cycle(self): - t: int - - t = self.r[12] ^ self.r[13] ^ self.konst - t = self.sbox(t) ^ self.rotl(self.r[0], 1) - - for i in range(1, self.n): - self.r[i - 1] = self.r[i] - - self.r[self.n - 1] = t - - t = self.sbox2(self.r[2] ^ self.r[15]) - self.r[0] ^= t - self.sbuf = t ^ self.r[8] ^ self.r[12] - - def crc_func(self, i: int): - t: int - - t = self.crc[0] ^ self.crc[2] ^ self.crc[15] ^ i - - for j in range(1, self.n): - self.crc[j - 1] = self.crc[j] - - self.crc[self.n - 1] = t - - def mac_func(self, i: int): - self.crc_func(i) - - self.r[self.keyp] ^= i - - def init_state(self): - self.r[0] = 1 - self.r[1] = 1 - - for i in range(2, self.n): - self.r[i] = self.r[i - 1] + self.r[i - 2] - - self.konst = self.initkonst - - def save_state(self): - for i in range(self.n): - self.initr[i] = self.r[i] - - def reload_state(self): - for i in range(self.n): - self.r[i] = self.initr[i] - - def gen_konst(self): - self.konst = self.r[0] - - def add_key(self, k: int): - self.r[self.keyp] ^= k - - def diffuse(self): - for i in range(self.fold): - self.cycle() - - def load_key(self, key: bytes): - extra = bytearray(4) - i: int - j: int - t: int - - padding_size = int((len(key) + 3) / 4) * 4 - len(key) - key = key + (b"\x00" * padding_size) + struct.pack("> (32 - self.nbuf)) & 0xff - - i += 1 - - self.nbuf -= 8 - - n -= 1 - - if self.nbuf != 0: - return - - self.mac_func(self.mbuf) - - j = n & ~0x03 - - while i < j: - self.cycle() - - t = ((buffer[i + 3] & 0xFF) << 24) | \ - ((buffer[i + 2] & 0xFF) << 16) | \ - ((buffer[i + 1] & 0xFF) << 8) | \ - (buffer[i] & 0xFF) - - self.mac_func(t) - - t ^= self.sbuf - - buffer[i + 3] = (t >> 24) & 0xFF - buffer[i + 2] = (t >> 16) & 0xFF - buffer[i + 3] = (t >> 8) & 0xFF - buffer[i] = t & 0xFF - - i += 4 - - n &= 0x03 - - if n != 0: - self.cycle() - - self.mbuf = 0 - self.nbuf = 32 - - while self.nbuf != 0 and n != 0: - self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf) - buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff - - i += 1 - - self.nbuf -= 8 - - n -= 1 - return bytes(buffer) - - def decrypt(self, buffer: bytes, n: int = None): - if n is None: - return self.decrypt(buffer, len(buffer)) - - buffer = bytearray(buffer) - - i = 0 - j: int - t: int - - if self.nbuf != 0: - while self.nbuf != 0 and n != 0: - buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff - self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf) - - i += 1 - - self.nbuf -= 8 - - n -= 1 - - if self.nbuf != 0: - return - - self.mac_func(self.mbuf) - - j = n & ~0x03 - - while i < j: - self.cycle() - - t = ((buffer[i + 3] & 0xFF) << 24) | \ - ((buffer[i + 2] & 0xFF) << 16) | \ - ((buffer[i + 1] & 0xFF) << 8) | \ - (buffer[i] & 0xFF) - - t ^= self.sbuf - - self.mac_func(t) - - buffer[i + 3] = (t >> 24) & 0xFF - buffer[i + 2] = (t >> 16) & 0xFF - buffer[i + 1] = (t >> 8) & 0xFF - buffer[i] = t & 0xFF - - i += 4 - - n &= 0x03 - - if n != 0: - self.cycle() - - self.mbuf = 0 - self.nbuf = 32 - - while self.nbuf != 0 and n != 0: - buffer[i] ^= (self.sbuf >> (32 - self.nbuf)) & 0xff - self.mbuf ^= (buffer[i] & 0xff) << (32 - self.nbuf) - - i += 1 - - self.nbuf -= 8 - - n -= 1 - - return bytes(buffer) - - def finish(self, buffer: bytes, n: int = None): - if n is None: - return self.finish(buffer, len(buffer)) - - buffer = bytearray(buffer) - - i = 0 - j: int - - if self.nbuf != 0: - self.mac_func(self.mbuf) - - self.cycle() - self.add_key(self.initkonst ^ (self.nbuf << 3)) - - self.nbuf = 0 - - for j in range(self.n): - self.r[j] ^= self.crc[j] - - self.diffuse() - - while n > 0: - self.cycle() - - if n >= 4: - buffer[i + 3] = (self.sbuf >> 24) & 0xff - buffer[i + 2] = (self.sbuf >> 16) & 0xff - buffer[i + 1] = (self.sbuf >> 8) & 0xff - buffer[i] = self.sbuf & 0xff - - n -= 4 - i += 4 - else: - for j in range(n): - buffer[i + j] = (self.sbuf >> (i * 8)) & 0xff - break - return bytes(buffer)