# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
# pylint: disable=too-many-lines

from io import BytesIO
from typing import (
    Any, AnyStr, AsyncGenerator, AsyncIterable, cast,
    Dict, IO, Iterable, List, Optional, Tuple, Union,
    TYPE_CHECKING
)
from urllib.parse import quote, unquote, urlparse

from ._deserialize import deserialize_blob_stream
from ._encryption import modify_user_agent_for_encryption, _ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION
from ._generated.models import (
    AppendPositionAccessConditions,
    BlobHTTPHeaders,
    BlockList,
    BlockLookupList,
    CpkInfo,
    DeleteSnapshotsOptionType,
    QueryRequest,
    SequenceNumberAccessConditions
)
from ._models import (
    BlobBlock,
    BlobProperties,
    BlobType,
    DelimitedJsonDialect,
    DelimitedTextDialect,
    PremiumPageBlobTier,
    QuickQueryDialect
)
from ._serialize import (
    get_access_conditions,
    get_cpk_scope_info,
    get_modify_conditions,
    get_source_conditions,
    serialize_blob_tags_header,
    serialize_blob_tags,
    serialize_query_format
)
from ._shared import encode_base64
from ._shared.base_client import parse_query
from ._shared.request_handlers import (
    add_metadata_headers,
    get_length,
    read_length,
    validate_and_format_range_headers
)
from ._shared.response_handlers import return_headers_and_deserialized, return_response_headers
from ._shared.uploads import IterStreamer
from ._shared.uploads_async import AsyncIterStreamer
from ._upload_helpers import _any_conditions

if TYPE_CHECKING:
    from urllib.parse import ParseResult
    from ._generated import AzureBlobStorage
    from ._models import ContentSettings
    from ._shared.models import StorageConfiguration


def _parse_url(
    account_url: str,
    container_name: str,
    blob_name: str
) -> Tuple["ParseResult", Optional[str], Optional[str]]:
    try:
        if not account_url.lower().startswith('http'):
            account_url = "https://" + account_url
    except AttributeError as exc:
        raise ValueError("Account URL must be a string.") from exc
    parsed_url = urlparse(account_url.rstrip('/'))

    if not (container_name and blob_name):
        raise ValueError("Please specify a container name and blob name.")
    if not parsed_url.netloc:
        raise ValueError(f"Invalid URL: {account_url}")

    path_snapshot, sas_token = parse_query(parsed_url.query)

    return parsed_url, sas_token, path_snapshot

def _format_url(container_name: Union[bytes, str], scheme: str, blob_name: str, query_str: str, hostname: str) -> str:
    if isinstance(container_name, str):
        container_name = container_name.encode('UTF-8')
    return f"{scheme}://{hostname}/{quote(container_name)}/{quote(blob_name, safe='~/')}{query_str}"

def _encode_source_url(source_url: str) -> str:
    parsed_source_url = urlparse(source_url)
    source_scheme = parsed_source_url.scheme
    source_hostname = parsed_source_url.netloc.rstrip('/')
    source_path = unquote(parsed_source_url.path)
    source_query = parsed_source_url.query
    result = [f"{source_scheme}://{source_hostname}{quote(source_path, safe='~/')}"]
    if source_query:
        result.append(source_query)
    return '?'.join(result)

def _upload_blob_options(  # pylint:disable=too-many-statements
    data: Union[bytes, str, Iterable[AnyStr], AsyncIterable[AnyStr], IO[bytes]],
    blob_type: Union[str, BlobType],
    length: Optional[int],
    metadata: Optional[Dict[str, str]],
    encryption_options: Dict[str, Any],
    config: "StorageConfiguration",
    sdk_moniker: str,
    client: "AzureBlobStorage",
    **kwargs: Any
) -> Dict[str, Any]:
    encoding = kwargs.pop('encoding', 'UTF-8')
    if isinstance(data, str):
        data = data.encode(encoding)
    if length is None:
        length = get_length(data)
    if isinstance(data, bytes):
        data = data[:length]

    stream: Optional[Any] = None
    if isinstance(data, bytes):
        stream = BytesIO(data)
    elif hasattr(data, 'read'):
        stream = data
    elif hasattr(data, '__iter__') and not isinstance(data, (list, tuple, set, dict)):
        stream = IterStreamer(data, encoding=encoding)
    elif hasattr(data, '__aiter__'):
        stream = AsyncIterStreamer(cast(AsyncGenerator, data), encoding=encoding)
    else:
        raise TypeError(f"Unsupported data type: {type(data)}")

    validate_content = kwargs.pop('validate_content', False)
    content_settings = kwargs.pop('content_settings', None)
    overwrite = kwargs.pop('overwrite', False)
    max_concurrency = kwargs.pop('max_concurrency', 1)
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)
    kwargs['cpk_info'] = cpk_info

    headers = kwargs.pop('headers', {})
    headers.update(add_metadata_headers(metadata))
    kwargs['lease_access_conditions'] = get_access_conditions(kwargs.pop('lease', None))
    kwargs['modified_access_conditions'] = get_modify_conditions(kwargs)
    kwargs['cpk_scope_info'] = get_cpk_scope_info(kwargs)
    if content_settings:
        kwargs['blob_headers'] = BlobHTTPHeaders(
            blob_cache_control=content_settings.cache_control,
            blob_content_type=content_settings.content_type,
            blob_content_md5=content_settings.content_md5,
            blob_content_encoding=content_settings.content_encoding,
            blob_content_language=content_settings.content_language,
            blob_content_disposition=content_settings.content_disposition
        )
    kwargs['blob_tags_string'] = serialize_blob_tags_header(kwargs.pop('tags', None))
    kwargs['stream'] = stream
    kwargs['length'] = length
    kwargs['overwrite'] = overwrite
    kwargs['headers'] = headers
    kwargs['validate_content'] = validate_content
    kwargs['blob_settings'] = config
    kwargs['max_concurrency'] = max_concurrency
    kwargs['encryption_options'] = encryption_options
    # Add feature flag to user agent for encryption
    if encryption_options['key']:
        modify_user_agent_for_encryption(
            config.user_agent_policy.user_agent,
            sdk_moniker,
            encryption_options['version'],
            kwargs)

    if blob_type == BlobType.BlockBlob:
        kwargs['client'] = client.block_blob
    elif blob_type == BlobType.PageBlob:
        if (encryption_options['version'] == '2.0' and
            (encryption_options['required'] or encryption_options['key'] is not None)):
            raise ValueError("Encryption version 2.0 does not currently support page blobs.")
        kwargs['client'] = client.page_blob
    elif blob_type == BlobType.AppendBlob:
        if encryption_options['required'] or (encryption_options['key'] is not None):
            raise ValueError(_ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION)
        kwargs['client'] = client.append_blob
    else:
        raise ValueError(f"Unsupported BlobType: {blob_type}")
    return kwargs

def _upload_blob_from_url_options(source_url: str, **kwargs: Any) -> Dict[str, Any]:
    metadata = kwargs.pop('metadata', None)
    headers = kwargs.pop('headers', {})
    headers.update(add_metadata_headers(metadata))
    source_url = _encode_source_url(source_url=source_url)
    tier = kwargs.pop('standard_blob_tier', None)
    overwrite = kwargs.pop('overwrite', False)
    content_settings = kwargs.pop('content_settings', None)
    source_authorization = kwargs.pop('source_authorization', None)
    if content_settings:
        kwargs['blob_http_headers'] = BlobHTTPHeaders(
            blob_cache_control=content_settings.cache_control,
            blob_content_type=content_settings.content_type,
            blob_content_md5=None,
            blob_content_encoding=content_settings.content_encoding,
            blob_content_language=content_settings.content_language,
            blob_content_disposition=content_settings.content_disposition
        )
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)

    options = {
        'copy_source_authorization': source_authorization,
        'content_length': 0,
        'copy_source_blob_properties': kwargs.pop('include_source_blob_properties', True),
        'source_content_md5': kwargs.pop('source_content_md5', None),
        'copy_source': source_url,
        'modified_access_conditions': get_modify_conditions(kwargs),
        'blob_tags_string': serialize_blob_tags_header(kwargs.pop('tags', None)),
        'cls': return_response_headers,
        'lease_access_conditions': get_access_conditions(kwargs.pop('destination_lease', None)),
        'tier': tier.value if tier else None,
        'source_modified_access_conditions': get_source_conditions(kwargs),
        'cpk_info': cpk_info,
        'cpk_scope_info': get_cpk_scope_info(kwargs),
        'headers': headers,
    }
    options.update(kwargs)
    if not overwrite and not _any_conditions(**options):
        options['modified_access_conditions'].if_none_match = '*'
    return options

def _download_blob_options(
    blob_name: str,
    container_name: str,
    version_id: Optional[str],
    offset: Optional[int],
    length: Optional[int],
    encoding: Optional[str],
    encryption_options: Dict[str, Any],
    config: "StorageConfiguration",
    sdk_moniker: str,
    client: "AzureBlobStorage",
    **kwargs
) -> Dict[str, Any]:
    """Creates a dictionary containing the options for a download blob operation.

    :param str blob_name:
        The name of the blob.
    :param str container_name:
        The name of the container.
    :param Optional[str] version_id:
        The version id parameter is a value that, when present, specifies the version of the blob to download.
    :param Optional[int] offset:
        Start of byte range to use for downloading a section of the blob. Must be set if length is provided.
    :param Optional[int] length:
        Number of bytes to read from the stream. This is optional, but should be supplied for optimal performance.
    :param Optional[str] encoding:
        Encoding to decode the downloaded bytes. Default is None, i.e. no decoding.
    :param Dict[str, Any] encryption_options:
        The options for encryption, if enabled.
    :param StorageConfiguration config:
        The Storage configuration options.
    :param str sdk_moniker:
        The string representing the SDK package version.
    :param AzureBlobStorage client:
        The generated Blob Storage client.
    :returns: A dictionary containing the download blob options.
    :rtype: Dict[str, Any]
    """
    if length is not None:
        if offset is None:
            raise ValueError("Offset must be provided if length is provided.")
        length = offset + length - 1  # Service actually uses an end-range inclusive index

    validate_content = kwargs.pop('validate_content', False)
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)

    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)

    # Add feature flag to user agent for encryption
    if encryption_options['key'] or encryption_options['resolver']:
        modify_user_agent_for_encryption(
            config.user_agent_policy.user_agent,
            sdk_moniker,
            encryption_options['version'],
            kwargs)

    options = {
        'clients': client,
        'config': config,
        'start_range': offset,
        'end_range': length,
        'version_id': version_id,
        'validate_content': validate_content,
        'encryption_options': {
            'required': encryption_options['required'],
            'key': encryption_options['key'],
            'resolver': encryption_options['resolver']},
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'cpk_info': cpk_info,
        'download_cls': kwargs.pop('cls', None) or deserialize_blob_stream,
        'max_concurrency':kwargs.pop('max_concurrency', 1),
        'encoding': encoding,
        'timeout': kwargs.pop('timeout', None),
        'name': blob_name,
        'container': container_name}
    options.update(kwargs)
    return options

def _quick_query_options(snapshot: Optional[str], query_expression: str, **kwargs: Any ) -> Tuple[Dict[str, Any], str]:
    delimiter = '\n'
    input_format = kwargs.pop('blob_format', None)
    if input_format == QuickQueryDialect.DelimitedJson:
        input_format = DelimitedJsonDialect()
    if input_format == QuickQueryDialect.DelimitedText:
        input_format = DelimitedTextDialect()
    input_parquet_format = input_format == "ParquetDialect"
    if input_format and not input_parquet_format:
        try:
            delimiter = input_format.lineterminator
        except AttributeError:
            try:
                delimiter = input_format.delimiter
            except AttributeError as exc:
                raise ValueError("The Type of blob_format can only be DelimitedTextDialect or "
                                    "DelimitedJsonDialect or ParquetDialect") from exc
    output_format = kwargs.pop('output_format', None)
    if output_format == QuickQueryDialect.DelimitedJson:
        output_format = DelimitedJsonDialect()
    if output_format == QuickQueryDialect.DelimitedText:
        output_format = DelimitedTextDialect()
    if output_format:
        if output_format == "ParquetDialect":
            raise ValueError("ParquetDialect is invalid as an output format.")
        try:
            delimiter = output_format.lineterminator
        except AttributeError:
            try:
                delimiter = output_format.delimiter
            except AttributeError:
                pass
    else:
        output_format = input_format if not input_parquet_format else None
    query_request = QueryRequest(
        expression=query_expression,
        input_serialization=serialize_query_format(input_format),
        output_serialization=serialize_query_format(output_format)
    )
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)

    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(
            encryption_key=cpk.key_value,
            encryption_key_sha256=cpk.key_hash,
            encryption_algorithm=cpk.algorithm
        )
    options = {
        'query_request': query_request,
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'cpk_info': cpk_info,
        'snapshot': snapshot,
        'timeout': kwargs.pop('timeout', None),
        'cls': return_headers_and_deserialized,
    }
    options.update(kwargs)
    return options, delimiter

def _generic_delete_blob_options(delete_snapshots: Optional[str] = None, **kwargs: Any) -> Dict[str, Any]:
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    if delete_snapshots:
        delete_snapshots = DeleteSnapshotsOptionType(delete_snapshots)
    options = {
        'timeout': kwargs.pop('timeout', None),
        'snapshot': kwargs.pop('snapshot', None),  # this is added for delete_blobs
        'delete_snapshots': delete_snapshots or None,
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions}
    options.update(kwargs)
    return options

def _delete_blob_options(
    snapshot: Optional[str],
    version_id: Optional[str],
    delete_snapshots: Optional[str] = None,
    **kwargs: Any
) -> Dict[str, Any]:
    if snapshot and delete_snapshots:
        raise ValueError("The delete_snapshots option cannot be used with a specific snapshot.")
    options = _generic_delete_blob_options(delete_snapshots, **kwargs)
    options['snapshot'] = snapshot
    options['version_id'] = version_id
    options['blob_delete_type'] = kwargs.pop('blob_delete_type', None)
    return options

def _set_http_headers_options(content_settings: Optional["ContentSettings"] = None, **kwargs: Any) -> Dict[str, Any]:
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    blob_headers = None
    if content_settings:
        blob_headers = BlobHTTPHeaders(
            blob_cache_control=content_settings.cache_control,
            blob_content_type=content_settings.content_type,
            blob_content_md5=content_settings.content_md5,
            blob_content_encoding=content_settings.content_encoding,
            blob_content_language=content_settings.content_language,
            blob_content_disposition=content_settings.content_disposition
        )
    options = {
        'timeout': kwargs.pop('timeout', None),
        'blob_http_headers': blob_headers,
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'cls': return_response_headers}
    options.update(kwargs)
    return options

def _set_blob_metadata_options(metadata: Optional[Dict[str, str]] = None, **kwargs: Any):
    headers = kwargs.pop('headers', {})
    headers.update(add_metadata_headers(metadata))
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    cpk_scope_info = get_cpk_scope_info(kwargs)

    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)
    options = {
        'timeout': kwargs.pop('timeout', None),
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'cls': return_response_headers,
        'headers': headers}
    options.update(kwargs)
    return options

def _create_page_blob_options(
    size: int,
    content_settings: Optional["ContentSettings"] = None,
    metadata: Optional[Dict[str, str]] = None,
    premium_page_blob_tier: Optional[Union[str, "PremiumPageBlobTier"]] = None,
    **kwargs: Any
) -> Dict[str, Any]:
    headers = kwargs.pop('headers', {})
    headers.update(add_metadata_headers(metadata))
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    cpk_scope_info = get_cpk_scope_info(kwargs)
    blob_headers = None
    if content_settings:
        blob_headers = BlobHTTPHeaders(
            blob_cache_control=content_settings.cache_control,
            blob_content_type=content_settings.content_type,
            blob_content_md5=content_settings.content_md5,
            blob_content_encoding=content_settings.content_encoding,
            blob_content_language=content_settings.content_language,
            blob_content_disposition=content_settings.content_disposition
        )

    sequence_number = kwargs.pop('sequence_number', None)
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)

    immutability_policy = kwargs.pop('immutability_policy', None)
    if immutability_policy:
        kwargs['immutability_policy_expiry'] = immutability_policy.expiry_time
        kwargs['immutability_policy_mode'] = immutability_policy.policy_mode

    tier = None
    if premium_page_blob_tier:
        try:
            tier = premium_page_blob_tier.value  # type: ignore
        except AttributeError:
            tier = premium_page_blob_tier  # type: ignore

    blob_tags_string = serialize_blob_tags_header(kwargs.pop('tags', None))

    options = {
        'content_length': 0,
        'blob_content_length': size,
        'blob_sequence_number': sequence_number,
        'blob_http_headers': blob_headers,
        'timeout': kwargs.pop('timeout', None),
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'blob_tags_string': blob_tags_string,
        'cls': return_response_headers,
        "tier": tier,
        'headers': headers}
    options.update(kwargs)
    return options

def _create_append_blob_options(
    content_settings: Optional["ContentSettings"] = None,
    metadata: Optional[Dict[str, str]] = None,
    **kwargs: Any
) -> Dict[str, Any]:
    headers = kwargs.pop('headers', {})
    headers.update(add_metadata_headers(metadata))
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    cpk_scope_info = get_cpk_scope_info(kwargs)
    blob_headers = None
    if content_settings:
        blob_headers = BlobHTTPHeaders(
            blob_cache_control=content_settings.cache_control,
            blob_content_type=content_settings.content_type,
            blob_content_md5=content_settings.content_md5,
            blob_content_encoding=content_settings.content_encoding,
            blob_content_language=content_settings.content_language,
            blob_content_disposition=content_settings.content_disposition
        )

    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)

    immutability_policy = kwargs.pop('immutability_policy', None)
    if immutability_policy:
        kwargs['immutability_policy_expiry'] = immutability_policy.expiry_time
        kwargs['immutability_policy_mode'] = immutability_policy.policy_mode

    blob_tags_string = serialize_blob_tags_header(kwargs.pop('tags', None))

    options = {
        'content_length': 0,
        'blob_http_headers': blob_headers,
        'timeout': kwargs.pop('timeout', None),
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'blob_tags_string': blob_tags_string,
        'cls': return_response_headers,
        'headers': headers}
    options.update(kwargs)
    return options

def _create_snapshot_options(metadata: Optional[Dict[str, str]] = None, **kwargs: Any) -> Dict[str, Any]:
    headers = kwargs.pop('headers', {})
    headers.update(add_metadata_headers(metadata))
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    cpk_scope_info = get_cpk_scope_info(kwargs)
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)

    options = {
        'timeout': kwargs.pop('timeout', None),
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'cls': return_response_headers,
        'headers': headers}
    options.update(kwargs)
    return options

def _start_copy_from_url_options(  # pylint:disable=too-many-statements
    source_url: str,
    metadata: Optional[Dict[str, str]] = None,
    incremental_copy: bool = False,
    **kwargs: Any
) -> Dict[str, Any]:
    source_url = _encode_source_url(source_url=source_url)
    headers = kwargs.pop('headers', {})
    headers.update(add_metadata_headers(metadata))
    if 'source_lease' in kwargs:
        source_lease = kwargs.pop('source_lease')
        try:
            headers['x-ms-source-lease-id'] = source_lease.id
        except AttributeError:
            headers['x-ms-source-lease-id'] = source_lease

    tier = kwargs.pop('premium_page_blob_tier', None) or kwargs.pop('standard_blob_tier', None)
    tags = kwargs.pop('tags', None)

    # Options only available for sync copy
    requires_sync = kwargs.pop('requires_sync', None)
    encryption_scope_str = kwargs.pop('encryption_scope', None)
    source_authorization = kwargs.pop('source_authorization', None)
    # If tags is a str, interpret that as copy_source_tags
    copy_source_tags = isinstance(tags, str)

    if incremental_copy:
        if source_authorization:
            raise ValueError("Source authorization tokens are not applicable for incremental copying.")
        if copy_source_tags:
            raise ValueError("Copying source tags is not applicable for incremental copying.")

    # TODO: refactor start_copy_from_url api in _blob_client.py. Call _generated/_blob_operations.py copy_from_url
    #  when requires_sync=True is set.
    #  Currently both sync copy and async copy are calling _generated/_blob_operations.py start_copy_from_url.
    #  As sync copy diverges more from async copy, more problem will surface.
    if requires_sync is True:
        headers['x-ms-requires-sync'] = str(requires_sync)
        if encryption_scope_str:
            headers['x-ms-encryption-scope'] = encryption_scope_str
        if source_authorization:
            headers['x-ms-copy-source-authorization'] = source_authorization
        if copy_source_tags:
            headers['x-ms-copy-source-tag-option'] = tags
    else:
        if encryption_scope_str:
            raise ValueError(
                "Encryption_scope is only supported for sync copy, please specify requires_sync=True")
        if source_authorization:
            raise ValueError(
                "Source authorization tokens are only supported for sync copy, please specify requires_sync=True")
        if copy_source_tags:
            raise ValueError(
                "Copying source tags is only supported for sync copy, please specify requires_sync=True")

    timeout = kwargs.pop('timeout', None)
    dest_mod_conditions = get_modify_conditions(kwargs)
    blob_tags_string = serialize_blob_tags_header(tags) if not copy_source_tags else None

    immutability_policy = kwargs.pop('immutability_policy', None)
    if immutability_policy:
        kwargs['immutability_policy_expiry'] = immutability_policy.expiry_time
        kwargs['immutability_policy_mode'] = immutability_policy.policy_mode

    options = {
        'copy_source': source_url,
        'seal_blob': kwargs.pop('seal_destination_blob', None),
        'timeout': timeout,
        'modified_access_conditions': dest_mod_conditions,
        'blob_tags_string': blob_tags_string,
        'headers': headers,
        'cls': return_response_headers,
    }
    if not incremental_copy:
        source_mod_conditions = get_source_conditions(kwargs)
        dest_access_conditions = get_access_conditions(kwargs.pop('destination_lease', None))
        options['source_modified_access_conditions'] = source_mod_conditions
        options['lease_access_conditions'] = dest_access_conditions
        options['tier'] = tier.value if tier else None
    options.update(kwargs)
    return options

def _abort_copy_options(copy_id: Union[str, Dict[str, Any], BlobProperties], **kwargs: Any) -> Dict[str, Any]:
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    if isinstance(copy_id, BlobProperties):
        copy_id = copy_id.copy.id  # type: ignore [assignment]
    elif isinstance(copy_id, dict):
        copy_id = copy_id['copy_id']
    options = {
        'copy_id': copy_id,
        'lease_access_conditions': access_conditions,
        'timeout': kwargs.pop('timeout', None)}
    options.update(kwargs)
    return options

def _stage_block_options(
    block_id: str,
    data: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]],
    length: Optional[int] = None,
    **kwargs: Any
) -> Dict[str, Any]:
    block_id = encode_base64(str(block_id))
    if isinstance(data, str):
        data = data.encode(kwargs.pop('encoding', 'UTF-8'))  # type: ignore
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    if length is None:
        length = get_length(data)
        if length is None:
            length, data = read_length(data)
    if isinstance(data, bytes):
        data = data[:length]

    validate_content = kwargs.pop('validate_content', False)
    cpk_scope_info = get_cpk_scope_info(kwargs)
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)

    options = {
        'block_id': block_id,
        'content_length': length,
        'body': data,
        'transactional_content_md5': None,
        'timeout': kwargs.pop('timeout', None),
        'lease_access_conditions': access_conditions,
        'validate_content': validate_content,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'cls': return_response_headers,
    }
    options.update(kwargs)
    return options

def _stage_block_from_url_options(
    block_id: str,
    source_url: str,
    source_offset: Optional[int] = None,
    source_length: Optional[int] = None,
    source_content_md5: Optional[Union[bytes, bytearray]] = None,
    **kwargs: Any
) -> Dict[str, Any]:
    source_url = _encode_source_url(source_url=source_url)
    source_authorization = kwargs.pop('source_authorization', None)
    if source_length is not None and source_offset is None:
        raise ValueError("Source offset value must not be None if length is set.")
    if source_length is not None and source_offset is not None:
        source_length = source_offset + source_length - 1
    block_id = encode_base64(str(block_id))
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    range_header = None
    if source_offset is not None:
        range_header, _ = validate_and_format_range_headers(source_offset, source_length)

    cpk_scope_info = get_cpk_scope_info(kwargs)
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)
    options = {
        'copy_source_authorization': source_authorization,
        'block_id': block_id,
        'content_length': 0,
        'source_url': source_url,
        'source_range': range_header,
        'source_content_md5': bytearray(source_content_md5) if source_content_md5 else None,
        'timeout': kwargs.pop('timeout', None),
        'lease_access_conditions': access_conditions,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'cls': return_response_headers,
    }
    options.update(kwargs)
    return options

def _get_block_list_result(blocks: BlockList) -> Tuple[List[BlobBlock], List[BlobBlock]]:
    committed = []
    uncommitted = []
    if blocks.committed_blocks:
        committed = [BlobBlock._from_generated(b) for b in blocks.committed_blocks]  # pylint: disable=protected-access
    if blocks.uncommitted_blocks:
        uncommitted = [BlobBlock._from_generated(b) for b in blocks.uncommitted_blocks]  # pylint: disable=protected-access
    return committed, uncommitted

def _commit_block_list_options(
    block_list: List[BlobBlock],
    content_settings: Optional["ContentSettings"] = None,
    metadata: Optional[Dict[str, str]] = None,
    **kwargs: Any
) -> Dict[str, Any]:
    block_lookup = BlockLookupList(committed=[], uncommitted=[], latest=[])
    for block in block_list:
        if isinstance(block, BlobBlock):
            if block.state.value == 'committed':
                cast(List[str], block_lookup.committed).append(encode_base64(str(block.id)))
            elif block.state.value == 'uncommitted':
                cast(List[str], block_lookup.uncommitted).append(encode_base64(str(block.id)))
            elif block_lookup.latest is not None:
                block_lookup.latest.append(encode_base64(str(block.id)))
        else:
            block_lookup.latest.append(encode_base64(str(block)))
    headers = kwargs.pop('headers', {})
    headers.update(add_metadata_headers(metadata))
    blob_headers = None
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    if content_settings:
        blob_headers = BlobHTTPHeaders(
            blob_cache_control=content_settings.cache_control,
            blob_content_type=content_settings.content_type,
            blob_content_md5=content_settings.content_md5,
            blob_content_encoding=content_settings.content_encoding,
            blob_content_language=content_settings.content_language,
            blob_content_disposition=content_settings.content_disposition
        )

    validate_content = kwargs.pop('validate_content', False)
    cpk_scope_info = get_cpk_scope_info(kwargs)
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)

    immutability_policy = kwargs.pop('immutability_policy', None)
    if immutability_policy:
        kwargs['immutability_policy_expiry'] = immutability_policy.expiry_time
        kwargs['immutability_policy_mode'] = immutability_policy.policy_mode

    tier = kwargs.pop('standard_blob_tier', None)
    blob_tags_string = serialize_blob_tags_header(kwargs.pop('tags', None))

    options = {
        'blocks': block_lookup,
        'blob_http_headers': blob_headers,
        'lease_access_conditions': access_conditions,
        'timeout': kwargs.pop('timeout', None),
        'modified_access_conditions': mod_conditions,
        'cls': return_response_headers,
        'validate_content': validate_content,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'tier': tier.value if tier else None,
        'blob_tags_string': blob_tags_string,
        'headers': headers
    }
    options.update(kwargs)
    return options

def _set_blob_tags_options(
    version_id: Optional[str],
    tags: Optional[Dict[str, str]] = None,
    **kwargs: Any
)-> Dict[str, Any]:
    serialized_tags = serialize_blob_tags(tags)
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)

    options = {
        'tags': serialized_tags,
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'version_id': version_id,
        'cls': return_response_headers}
    options.update(kwargs)
    return options

def _get_blob_tags_options(version_id: Optional[str], snapshot: Optional[str], **kwargs: Any) -> Dict[str, Any]:
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)

    options = {
        'version_id': version_id,
        'snapshot': snapshot,
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'timeout': kwargs.pop('timeout', None),
        'cls': return_headers_and_deserialized}
    return options

def _get_page_ranges_options(
    snapshot: Optional[str],
    offset: Optional[int] = None,
    length: Optional[int] = None,
    previous_snapshot_diff: Optional[Union[str, Dict[str, Any]]] = None,
    **kwargs: Any
) -> Dict[str, Any]:
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    if length is not None and offset is None:
        raise ValueError("Offset value must not be None if length is set.")
    if length is not None and offset is not None:
        length = offset + length - 1  # Reformat to an inclusive range index
    page_range, _ = validate_and_format_range_headers(
        offset, length, start_range_required=False, end_range_required=False, align_to_page=True
    )
    options = {
        'snapshot': snapshot,
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'timeout': kwargs.pop('timeout', None),
        'range': page_range}
    if previous_snapshot_diff:
        try:
            options['prevsnapshot'] = previous_snapshot_diff.snapshot # type: ignore
        except AttributeError:
            try:
                options['prevsnapshot'] = previous_snapshot_diff['snapshot'] # type: ignore
            except TypeError:
                options['prevsnapshot'] = previous_snapshot_diff
    options.update(kwargs)
    return options

def _set_sequence_number_options(
    sequence_number_action: str,
    sequence_number: Optional[str] = None,
    **kwargs: Any
) -> Dict[str, Any]:
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    if sequence_number_action is None:
        raise ValueError("A sequence number action must be specified")
    options = {
        'sequence_number_action': sequence_number_action,
        'timeout': kwargs.pop('timeout', None),
        'blob_sequence_number': sequence_number,
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'cls': return_response_headers}
    options.update(kwargs)
    return options

def _resize_blob_options(size: int, **kwargs: Any) -> Dict[str, Any]:
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    if size is None:
        raise ValueError("A content length must be specified for a Page Blob.")

    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)
    options = {
        'blob_content_length': size,
        'timeout': kwargs.pop('timeout', None),
        'lease_access_conditions': access_conditions,
        'modified_access_conditions': mod_conditions,
        'cpk_info': cpk_info,
        'cls': return_response_headers}
    options.update(kwargs)
    return options

def _upload_page_options(
    page: bytes,
    offset: int,
    length: int,
    **kwargs: Any
) -> Dict[str, Any]:
    if isinstance(page, str):
        page = page.encode(kwargs.pop('encoding', 'UTF-8'))
    if offset is None or offset % 512 != 0:
        raise ValueError("offset must be an integer that aligns with 512 page size")
    if length is None or length % 512 != 0:
        raise ValueError("length must be an integer that aligns with 512 page size")
    end_range = offset + length - 1  # Reformat to an inclusive range index
    content_range = f'bytes={offset}-{end_range}' # type: ignore
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    seq_conditions = SequenceNumberAccessConditions(
        if_sequence_number_less_than_or_equal_to=kwargs.pop('if_sequence_number_lte', None),
        if_sequence_number_less_than=kwargs.pop('if_sequence_number_lt', None),
        if_sequence_number_equal_to=kwargs.pop('if_sequence_number_eq', None)
    )
    mod_conditions = get_modify_conditions(kwargs)
    cpk_scope_info = get_cpk_scope_info(kwargs)
    validate_content = kwargs.pop('validate_content', False)
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)
    options = {
        'body': page[:length],
        'content_length': length,
        'transactional_content_md5': None,
        'timeout': kwargs.pop('timeout', None),
        'range': content_range,
        'lease_access_conditions': access_conditions,
        'sequence_number_access_conditions': seq_conditions,
        'modified_access_conditions': mod_conditions,
        'validate_content': validate_content,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'cls': return_response_headers}
    options.update(kwargs)
    return options

def _upload_pages_from_url_options(
    source_url: str,
    offset: int,
    length: int,
    source_offset: int,
    **kwargs: Any
) -> Dict[str, Any]:
    source_url = _encode_source_url(source_url=source_url)
    # TODO: extract the code to a method format_range
    if offset is None or offset % 512 != 0:
        raise ValueError("offset must be an integer that aligns with 512 page size")
    if length is None or length % 512 != 0:
        raise ValueError("length must be an integer that aligns with 512 page size")
    if source_offset is None or offset % 512 != 0:
        raise ValueError("source_offset must be an integer that aligns with 512 page size")

    # Format range
    end_range = offset + length - 1
    destination_range = f'bytes={offset}-{end_range}'
    source_range = f'bytes={source_offset}-{source_offset + length - 1}'  # should subtract 1 here?

    seq_conditions = SequenceNumberAccessConditions(
        if_sequence_number_less_than_or_equal_to=kwargs.pop('if_sequence_number_lte', None),
        if_sequence_number_less_than=kwargs.pop('if_sequence_number_lt', None),
        if_sequence_number_equal_to=kwargs.pop('if_sequence_number_eq', None)
    )
    source_authorization = kwargs.pop('source_authorization', None)
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    source_mod_conditions = get_source_conditions(kwargs)
    cpk_scope_info = get_cpk_scope_info(kwargs)
    source_content_md5 = kwargs.pop('source_content_md5', None)
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)

    options = {
        'copy_source_authorization': source_authorization,
        'source_url': source_url,
        'content_length': 0,
        'source_range': source_range,
        'range': destination_range,
        'source_content_md5': bytearray(source_content_md5) if source_content_md5 else None,
        'timeout': kwargs.pop('timeout', None),
        'lease_access_conditions': access_conditions,
        'sequence_number_access_conditions': seq_conditions,
        'modified_access_conditions': mod_conditions,
        'source_modified_access_conditions': source_mod_conditions,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'cls': return_response_headers}
    options.update(kwargs)
    return options

def _clear_page_options(
    offset: int,
    length: int,
    **kwargs: Any
) -> Dict[str, Any]:
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    seq_conditions = SequenceNumberAccessConditions(
        if_sequence_number_less_than_or_equal_to=kwargs.pop('if_sequence_number_lte', None),
        if_sequence_number_less_than=kwargs.pop('if_sequence_number_lt', None),
        if_sequence_number_equal_to=kwargs.pop('if_sequence_number_eq', None)
    )
    mod_conditions = get_modify_conditions(kwargs)
    if offset is None or offset % 512 != 0:
        raise ValueError("offset must be an integer that aligns with 512 page size")
    if length is None or length % 512 != 0:
        raise ValueError("length must be an integer that aligns with 512 page size")
    end_range = length + offset - 1  # Reformat to an inclusive range index
    content_range = f'bytes={offset}-{end_range}'

    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)

    options = {
        'content_length': 0,
        'timeout': kwargs.pop('timeout', None),
        'range': content_range,
        'lease_access_conditions': access_conditions,
        'sequence_number_access_conditions': seq_conditions,
        'modified_access_conditions': mod_conditions,
        'cpk_info': cpk_info,
        'cls': return_response_headers}
    options.update(kwargs)
    return options

def _append_block_options(
    data: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]],
    length: Optional[int] = None,
    **kwargs: Any
) -> Dict[str, Any]:
    if isinstance(data, str):
        data = data.encode(kwargs.pop('encoding', 'UTF-8'))
    if length is None:
        length = get_length(data)
        if length is None:
            length, data = read_length(data)
    if length == 0:
        return {}
    if isinstance(data, bytes):
        data = data[:length]

    appendpos_condition = kwargs.pop('appendpos_condition', None)
    maxsize_condition = kwargs.pop('maxsize_condition', None)
    validate_content = kwargs.pop('validate_content', False)
    append_conditions = None
    if maxsize_condition or appendpos_condition is not None:
        append_conditions = AppendPositionAccessConditions(
            max_size=maxsize_condition,
            append_position=appendpos_condition
        )
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    cpk_scope_info = get_cpk_scope_info(kwargs)
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)
    options = {
        'body': data,
        'content_length': length,
        'timeout': kwargs.pop('timeout', None),
        'transactional_content_md5': None,
        'lease_access_conditions': access_conditions,
        'append_position_access_conditions': append_conditions,
        'modified_access_conditions': mod_conditions,
        'validate_content': validate_content,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'cls': return_response_headers}
    options.update(kwargs)
    return options

def _append_block_from_url_options(
    copy_source_url: str,
    source_offset: Optional[int] = None,
    source_length: Optional[int] = None,
    **kwargs: Any
) -> Dict[str, Any]:
    copy_source_url = _encode_source_url(source_url=copy_source_url)
    # If end range is provided, start range must be provided
    if source_length is not None and source_offset is None:
        raise ValueError("source_offset should also be specified if source_length is specified")
    # Format based on whether length is present
    source_range = None
    if source_length is not None and source_offset is not None:
        end_range = source_offset + source_length - 1
        source_range = f'bytes={source_offset}-{end_range}'
    elif source_offset is not None:
        source_range = f"bytes={source_offset}-"

    appendpos_condition = kwargs.pop('appendpos_condition', None)
    maxsize_condition = kwargs.pop('maxsize_condition', None)
    source_content_md5 = kwargs.pop('source_content_md5', None)
    append_conditions = None
    if maxsize_condition or appendpos_condition is not None:
        append_conditions = AppendPositionAccessConditions(
            max_size=maxsize_condition,
            append_position=appendpos_condition
        )
    source_authorization = kwargs.pop('source_authorization', None)
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)
    source_mod_conditions = get_source_conditions(kwargs)
    cpk_scope_info = get_cpk_scope_info(kwargs)
    cpk = kwargs.pop('cpk', None)
    cpk_info = None
    if cpk:
        cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
                            encryption_algorithm=cpk.algorithm)

    options = {
        'copy_source_authorization': source_authorization,
        'source_url': copy_source_url,
        'content_length': 0,
        'source_range': source_range,
        'source_content_md5': source_content_md5,
        'transactional_content_md5': None,
        'lease_access_conditions': access_conditions,
        'append_position_access_conditions': append_conditions,
        'modified_access_conditions': mod_conditions,
        'source_modified_access_conditions': source_mod_conditions,
        'cpk_scope_info': cpk_scope_info,
        'cpk_info': cpk_info,
        'cls': return_response_headers,
        'timeout': kwargs.pop('timeout', None)}
    options.update(kwargs)
    return options

def _seal_append_blob_options(**kwargs: Any) -> Dict[str, Any]:
    appendpos_condition = kwargs.pop('appendpos_condition', None)
    append_conditions = None
    if appendpos_condition is not None:
        append_conditions = AppendPositionAccessConditions(
            append_position=appendpos_condition
        )
    access_conditions = get_access_conditions(kwargs.pop('lease', None))
    mod_conditions = get_modify_conditions(kwargs)

    options = {
        'timeout': kwargs.pop('timeout', None),
        'lease_access_conditions': access_conditions,
        'append_position_access_conditions': append_conditions,
        'modified_access_conditions': mod_conditions,
        'cls': return_response_headers}
    options.update(kwargs)
    return options

def _from_blob_url(
    blob_url: str,
    snapshot: Optional[Union[BlobProperties, str, Dict[str, Any]]]
) -> Tuple[str, str, str, Optional[str]]:
    try:
        if not blob_url.lower().startswith('http'):
            blob_url = "https://" + blob_url
    except AttributeError as exc:
        raise ValueError("Blob URL must be a string.") from exc
    parsed_url = urlparse(blob_url.rstrip('/'))

    if not parsed_url.netloc:
        raise ValueError(f"Invalid URL: {blob_url}")

    account_path = ""
    if ".core." in parsed_url.netloc:
        # .core. is indicating non-customized url. Blob name with directory info can also be parsed.
        path_blob = parsed_url.path.lstrip('/').split('/', maxsplit=1)
    elif "localhost" in parsed_url.netloc or "127.0.0.1" in parsed_url.netloc:
        path_blob = parsed_url.path.lstrip('/').split('/', maxsplit=2)
        account_path += '/' + path_blob[0]
    else:
        # for customized url. blob name that has directory info cannot be parsed.
        path_blob = parsed_url.path.lstrip('/').split('/')
        if len(path_blob) > 2:
            account_path = "/" + "/".join(path_blob[:-2])

    account_url = f"{parsed_url.scheme}://{parsed_url.netloc.rstrip('/')}{account_path}?{parsed_url.query}"

    msg_invalid_url = "Invalid URL. Provide a blob_url with a valid blob and container name."
    if len(path_blob) <= 1:
        raise ValueError(msg_invalid_url)
    container_name, blob_name = unquote(path_blob[-2]), unquote(path_blob[-1])
    if not container_name or not blob_name:
        raise ValueError(msg_invalid_url)

    path_snapshot, _ = parse_query(parsed_url.query)
    if snapshot:
        if isinstance(snapshot, BlobProperties):
            path_snapshot = snapshot.snapshot
        elif isinstance(snapshot, dict):
            path_snapshot = snapshot['snapshot']
        else:
            path_snapshot = snapshot
    return (account_url, container_name, blob_name, path_snapshot)
