# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
from urllib.parse import unquote
from xml.etree.ElementTree import Element

from ._models import (
    BlobAnalyticsLogging,
    BlobProperties,
    BlobType,
    ContainerProperties,
    ContentSettings,
    CopyProperties,
    CorsRule,
    ImmutabilityPolicy,
    LeaseProperties,
    Metrics,
    ObjectReplicationPolicy,
    ObjectReplicationRule,
    RetentionPolicy,
    StaticWebsite
)
from ._shared.models import get_enum_value
from ._shared.response_handlers import deserialize_metadata

if TYPE_CHECKING:
    from azure.core.pipeline import PipelineResponse
    from ._generated.models import (
        BlobItemInternal,
        BlobTags,
        PageList,
        StorageServiceProperties,
        StorageServiceStats,
    )
    from ._shared.models import LocationMode

def deserialize_pipeline_response_into_cls(cls_method, response: "PipelineResponse", obj: Any, headers: Dict[str, Any]):
    try:
        deserialized_response = response.http_response
    except AttributeError:
        deserialized_response = response
    return cls_method(deserialized_response, obj, headers)


def deserialize_blob_properties(response: "PipelineResponse", obj: Any, headers: Dict[str, Any]) -> BlobProperties:
    blob_properties = BlobProperties(
        metadata=deserialize_metadata(response, obj, headers),
        object_replication_source_properties=deserialize_ors_policies(response.http_response.headers),
        **headers
    )
    if 'Content-Range' in headers:
        if 'x-ms-blob-content-md5' in headers:
            blob_properties.content_settings.content_md5 = headers['x-ms-blob-content-md5']
        else:
            blob_properties.content_settings.content_md5 = None
    return blob_properties


def deserialize_ors_policies(policy_dictionary: Optional[Dict[str, str]]) -> Optional[List[ObjectReplicationPolicy]]:

    if policy_dictionary is None:
        return None
    # For source blobs (blobs that have policy ids and rule ids applied to them),
    # the header will be formatted as "x-ms-or-<policy_id>_<rule_id>: {Complete, Failed}".
    # The value of this header is the status of the replication.
    or_policy_status_headers = {key: val for key, val in policy_dictionary.items()
                                if 'or-' in key and key != 'x-ms-or-policy-id'}

    parsed_result: Dict[str, List[ObjectReplicationRule]] = {}

    for key, val in or_policy_status_headers.items():
        # list blobs gives or-policy_rule and get blob properties gives x-ms-or-policy_rule
        policy_and_rule_ids = key.split('or-')[1].split('_')
        policy_id = policy_and_rule_ids[0]
        rule_id = policy_and_rule_ids[1]

        # If we are seeing this policy for the first time, create a new list to store rule_id -> result
        parsed_result[policy_id] = parsed_result.get(policy_id) or []
        parsed_result[policy_id].append(ObjectReplicationRule(rule_id=rule_id, status=val))

    result_list = [ObjectReplicationPolicy(policy_id=k, rules=v) for k, v in parsed_result.items()]

    return result_list


def deserialize_blob_stream(
    response: "PipelineResponse",
    obj: Any,
    headers: Dict[str, Any]
) -> Tuple["LocationMode", Any]:
    blob_properties = deserialize_blob_properties(response, obj, headers)
    obj.properties = blob_properties
    return response.http_response.location_mode, obj


def deserialize_container_properties(
    response: "PipelineResponse",
    obj: Any,
    headers: Dict[str, Any]
) -> ContainerProperties:
    metadata = deserialize_metadata(response, obj, headers)
    container_properties = ContainerProperties(
        metadata=metadata,
        **headers
    )
    return container_properties


def get_page_ranges_result(ranges: "PageList") -> Tuple[List[Dict[str, int]], List[Dict[str, int]]]:
    page_range = []
    clear_range = []
    if ranges.page_range:
        page_range = [{'start': b.start, 'end': b.end} for b in ranges.page_range]
    if ranges.clear_range:
        clear_range = [{'start': b.start, 'end': b.end} for b in ranges.clear_range]
    return page_range, clear_range


def service_stats_deserialize(generated: "StorageServiceStats") -> Dict[str, Any]:
    status = None
    last_sync_time = None
    if generated.geo_replication is not None:
        status = generated.geo_replication.status
        last_sync_time = generated.geo_replication.last_sync_time
    return {
        'geo_replication': {
            'status': status,
            'last_sync_time': last_sync_time
        }
    }

def service_properties_deserialize(generated: "StorageServiceProperties") -> Dict[str, Any]:
    cors_list = None
    if generated.cors is not None:
        cors_list = [CorsRule._from_generated(cors) for cors in generated.cors]  # pylint: disable=protected-access
    return {
        'analytics_logging': BlobAnalyticsLogging._from_generated(generated.logging),  # pylint: disable=protected-access
        'hour_metrics': Metrics._from_generated(generated.hour_metrics),  # pylint: disable=protected-access
        'minute_metrics': Metrics._from_generated(generated.minute_metrics),  # pylint: disable=protected-access
        'cors': cors_list,
        'target_version': generated.default_service_version,
        'delete_retention_policy': RetentionPolicy._from_generated(generated.delete_retention_policy),  # pylint: disable=protected-access
        'static_website': StaticWebsite._from_generated(generated.static_website),  # pylint: disable=protected-access
    }


def get_blob_properties_from_generated_code(generated: "BlobItemInternal") -> BlobProperties:
    blob = BlobProperties()
    if generated.name.encoded and generated.name.content is not None:
        blob.name = unquote(generated.name.content)
    else:
        blob.name = generated.name.content  #type: ignore
    blob_type = get_enum_value(generated.properties.blob_type)
    blob.blob_type = BlobType(blob_type)
    blob.etag = generated.properties.etag
    blob.deleted = generated.deleted
    blob.snapshot = generated.snapshot
    blob.is_append_blob_sealed = generated.properties.is_sealed
    blob.metadata = generated.metadata.additional_properties if generated.metadata else {}  # type: ignore [assignment]
    blob.encrypted_metadata = generated.metadata.encrypted if generated.metadata else None
    blob.lease = LeaseProperties._from_generated(generated)  # pylint: disable=protected-access
    blob.copy = CopyProperties._from_generated(generated)  # pylint: disable=protected-access
    blob.last_modified = generated.properties.last_modified
    blob.creation_time = generated.properties.creation_time  # type: ignore [assignment]
    blob.content_settings = ContentSettings._from_generated(generated)  # pylint: disable=protected-access
    blob.size = generated.properties.content_length  # type: ignore [assignment]
    blob.page_blob_sequence_number = generated.properties.blob_sequence_number
    blob.server_encrypted = generated.properties.server_encrypted  # type: ignore [assignment]
    blob.encryption_scope = generated.properties.encryption_scope
    blob.deleted_time = generated.properties.deleted_time
    blob.remaining_retention_days = generated.properties.remaining_retention_days
    blob.blob_tier = generated.properties.access_tier  # type: ignore [assignment]
    blob.rehydrate_priority = generated.properties.rehydrate_priority
    blob.blob_tier_inferred = generated.properties.access_tier_inferred
    blob.archive_status = generated.properties.archive_status
    blob.blob_tier_change_time = generated.properties.access_tier_change_time
    blob.version_id = generated.version_id
    blob.is_current_version = generated.is_current_version
    blob.tag_count = generated.properties.tag_count
    blob.tags = parse_tags(generated.blob_tags)
    blob.object_replication_source_properties = deserialize_ors_policies(generated.object_replication_metadata)
    blob.last_accessed_on = generated.properties.last_accessed_on
    blob.immutability_policy = ImmutabilityPolicy._from_generated(generated)  # pylint: disable=protected-access
    blob.has_legal_hold = generated.properties.legal_hold
    blob.has_versions_only = generated.has_versions_only
    return blob

def parse_tags(generated_tags: Optional["BlobTags"]) -> Optional[Dict[str, str]]:
    """Deserialize a list of BlobTag objects into a dict.

    :param Optional[BlobTags] generated_tags:
        A list containing the BlobTag objects from generated code.
    :returns: A dictionary of the BlobTag objects.
    :rtype: Optional[Dict[str, str]]
    """
    if generated_tags:
        tag_dict = {t.key: t.value for t in generated_tags.blob_tag_set}
        return tag_dict
    return None


def load_single_xml_node(element: Element, name: str) -> Optional[Element]:
    return element.find(name)


def load_many_xml_nodes(
    element: Element,
    name: str,
    wrapper: Optional[str] = None
) -> List[Optional[Element]]:
    found_element: Optional[Element] = element
    if wrapper:
        found_element = load_single_xml_node(element, wrapper)
    if found_element is None:
        return []
    return list(found_element.findall(name))


def load_xml_string(element: Element, name: str) -> Optional[str]:
    node = element.find(name)
    if node is None or not node.text:
        return None
    return node.text


def load_xml_int(element: Element, name: str) -> Optional[int]:
    node = element.find(name)
    if node is None or not node.text:
        return None
    return int(node.text)
