You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
109 lines
3.6 KiB
109 lines
3.6 KiB
from base64 import (
|
|
b32encode,
|
|
b64decode,
|
|
)
|
|
from collections.abc import Generator
|
|
from typing import (
|
|
Any,
|
|
Dict,
|
|
List,
|
|
Union,
|
|
)
|
|
from urllib.parse import (
|
|
ParseResult,
|
|
parse_qs,
|
|
quote,
|
|
urlencode,
|
|
urlparse,
|
|
)
|
|
|
|
from .otpauth_enums import (
|
|
Algorithm,
|
|
DigitCount,
|
|
OtpType,
|
|
)
|
|
from .otpauth_migration_pb2 import Payload
|
|
|
|
|
|
SCHEME = 'otpauth-migration'
|
|
HOSTNAME = 'offline'
|
|
PAYLOAD_MARK = 'data'
|
|
EXAMPLE_PAYLOAD = 'CjEKCkhlbGxvId6tvu8SGEV4YW1wbGU6YWxpY2VAZ29vZ2xlLmNvbRoHRXhhbXBsZTAC'
|
|
EXAMPLE_MIGRATION = f'{SCHEME}://{HOSTNAME}?{PAYLOAD_MARK}={EXAMPLE_PAYLOAD}'
|
|
|
|
|
|
def is_migration_incorrect(
|
|
*,
|
|
parsed_url: ParseResult,
|
|
parsed_qs: Dict[str, Any],
|
|
) -> bool:
|
|
return (
|
|
parsed_url.scheme != SCHEME
|
|
or parsed_url.hostname != HOSTNAME
|
|
or PAYLOAD_MARK not in parsed_qs
|
|
or not isinstance(parsed_qs[PAYLOAD_MARK], list)
|
|
)
|
|
|
|
|
|
def decoded_data(data: List[str]) -> Generator:
|
|
for data_item in data:
|
|
yield b64decode(data_item)
|
|
|
|
|
|
def decode_secret(secret: bytes) -> str:
|
|
return str(b32encode(secret), 'utf-8').replace('=', '')
|
|
|
|
|
|
def get_url_params(otp: Payload.OtpParameters) -> str:
|
|
params: dict[str, Union[str, int]] = {}
|
|
|
|
if otp.algorithm:
|
|
params.update(algorithm=Algorithm.get(otp.algorithm, ''))
|
|
if otp.digits:
|
|
params.update(digits=DigitCount.get(otp.digits, ''))
|
|
if otp.issuer:
|
|
params.update(issuer=otp.issuer)
|
|
if otp.secret:
|
|
otp_secret = decode_secret(otp.secret)
|
|
params.update(secret=otp_secret)
|
|
|
|
return urlencode(params)
|
|
|
|
|
|
def get_otpauth_url(otp: Payload.OtpParameters) -> str:
|
|
otp_type = OtpType.get(otp.type, '')
|
|
otp_name = quote(otp.name)
|
|
otp_params = get_url_params(otp)
|
|
|
|
return f'otpauth://{otp_type}/{otp_name}?{otp_params}'
|
|
|
|
|
|
def validate_migration(migration: str) -> list:
|
|
url: ParseResult = urlparse(migration)
|
|
qs: Dict[str, Any] = parse_qs(url.query)
|
|
|
|
|
|
return qs[PAYLOAD_MARK]
|
|
|
|
|
|
def decoder(migration_data: list) -> list:
|
|
"""Convert Google Authenticator data to plain otpauth links"""
|
|
result = []
|
|
|
|
for payload in decoded_data(data=migration_data):
|
|
migration_payload = Payload()
|
|
migration_payload.ParseFromString(payload)
|
|
|
|
for otp_item in migration_payload.otp_parameters:
|
|
result.append(get_otpauth_url(otp_item))
|
|
|
|
return result
|
|
|
|
def start_decode_migration(migration_code: str) -> list:
|
|
payload_list = validate_migration(migration_code)
|
|
results = decoder(payload_list)
|
|
return results
|
|
|
|
if __name__ == "__main__":
|
|
start_decode_migration("otpauth-migration://offline?data=CkAKFFUlyp7lQLXJnL8Oc5%2BjVRVCEIMPEhpHb29nbGU6cHVha2Foa2luQGdtYWlsLmNvbRoGR29vZ2xlIAEoATACCj0KFEMSL8awHj%2BEFzX2cODi9rNgfvPFEhdHb29nbGU6a2Foa2luQGdtYWlsLmNvbRoGR29vZ2xlIAEoATACCi8KEBdHlsAmppdFyWD18FyPcQkSDXNhbXVlbEBhcG9sbG8aBmFwb2xsbyABKAEwAgouChQpi9Btg%2BHoQJj6tilYEsOef%2BSkiRIQc2FtdWVsQGFwaHJvZGl0ZSABKAEwAgo1ChDnUjGxj%2BJmsJJxdcrbBNpqEhBzYW11ZWxAc2FtdWVscHVhGglzYW11ZWxwdWEgASgBMAIKUAoUihAclMtkwYi3NfvbkHcHmgAi2yESJmdpdGxhYi5jb206Z2l0bGFiLmNvbV9rYWhraW5AZ21haWwuY29tGgpnaXRsYWIuY29tIAEoATACCioKCsaExzWrYHtcsLoSDkdpdEh1Yjp0ZWxib29uGgZHaXRIdWIgASgBMAIKbwooyfRfWMXsY%2B%2B%2FVvZKl9ypsl1VLAZrOPmyr3AsGxeCOuTHmGdGpt4OyBIoQW1hem9uIFdlYiBTZXJ2aWNlczp0ZWxib29uQDI2Nzk5NDAyOTgxMBoTQW1hem9uIFdlYiBTZXJ2aWNlcyABKAEwAgp%2FCiieutAtJJdKrtOMVznsVaMbvrpM%2FNvwURQSChMpPzgI2PlVshsoRk3QEjhBbWF6b24gV2ViIFNlcnZpY2VzOnJvb3QtYWNjb3VudC1tZmEtZGV2aWNlQDI2Nzk5NDAyOTgxMBoTQW1hem9uIFdlYiBTZXJ2aWNlcyABKAEwAgozCgr2b4%2BKdfJzdo%2ByEhRzYW11ZWxAd2F0Y2h0b3dyLmNvbRoJTWljcm9zb2Z0IAEoATACEAEYAiAAKKessZb6%2F%2F%2F%2F%2FwE%3D")
|
|
|