# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
# pylint: disable=docstring-missing-return, docstring-missing-rtype, too-many-lines

"""Representation of Avro schemas.

A schema may be one of:
 - A record, mapping field names to field value data;
 - An error, equivalent to a record;
 - An enum, containing one of a small set of symbols;
 - An array of values, all of the same schema;
 - A map containing string/value pairs, each of a declared schema;
 - A union of other schemas;
 - A fixed sized binary object;
 - A unicode string;
 - A sequence of bytes;
 - A 32-bit signed int;
 - A 64-bit signed long;
 - A 32-bit floating-point float;
 - A 64-bit floating-point double;
 - A boolean;
 - Null.
"""

import abc
import json
import logging
import re
logger = logging.getLogger(__name__)

# ------------------------------------------------------------------------------
# Constants

# Log level more verbose than DEBUG=10, INFO=20, etc.
DEBUG_VERBOSE = 5

NULL = 'null'
BOOLEAN = 'boolean'
STRING = 'string'
BYTES = 'bytes'
INT = 'int'
LONG = 'long'
FLOAT = 'float'
DOUBLE = 'double'
FIXED = 'fixed'
ENUM = 'enum'
RECORD = 'record'
ERROR = 'error'
ARRAY = 'array'
MAP = 'map'
UNION = 'union'

# Request and error unions are part of Avro protocols:
REQUEST = 'request'
ERROR_UNION = 'error_union'

PRIMITIVE_TYPES = frozenset([
    NULL,
    BOOLEAN,
    STRING,
    BYTES,
    INT,
    LONG,
    FLOAT,
    DOUBLE,
])

NAMED_TYPES = frozenset([
    FIXED,
    ENUM,
    RECORD,
    ERROR,
])

VALID_TYPES = frozenset.union(
    PRIMITIVE_TYPES,
    NAMED_TYPES,
    [
        ARRAY,
        MAP,
        UNION,
        REQUEST,
        ERROR_UNION,
    ],
)

SCHEMA_RESERVED_PROPS = frozenset([
    'type',
    'name',
    'namespace',
    'fields',  # Record
    'items',  # Array
    'size',  # Fixed
    'symbols',  # Enum
    'values',  # Map
    'doc',
])

FIELD_RESERVED_PROPS = frozenset([
    'default',
    'name',
    'doc',
    'order',
    'type',
])

VALID_FIELD_SORT_ORDERS = frozenset([
    'ascending',
    'descending',
    'ignore',
])


# ------------------------------------------------------------------------------
# Exceptions


class Error(Exception):
    """Base class for errors in this module."""


class AvroException(Error):
    """Generic Avro schema error."""


class SchemaParseException(AvroException):
    """Error while parsing a JSON schema descriptor."""


class Schema(metaclass=abc.ABCMeta):
    """Abstract base class for all Schema classes."""

    def __init__(self, data_type, other_props=None):
        """Initializes a new schema object.

        Args:
          data_type: Type of the schema to initialize.
          other_props: Optional dictionary of additional properties.
        """
        if data_type not in VALID_TYPES:
            raise SchemaParseException(f'{data_type!r} is not a valid Avro type.')

        # All properties of this schema, as a map: property name -> property value
        self._props = {}

        self._props['type'] = data_type
        self._type = data_type

        if other_props:
            self._props.update(other_props)

    @property
    def namespace(self):
        """Returns: the namespace this schema belongs to, if any, or None."""
        return self._props.get('namespace', None)

    @property
    def type(self):
        """Returns: the type of this schema."""
        return self._type

    @property
    def doc(self):
        """Returns: the documentation associated to this schema, if any, or None."""
        return self._props.get('doc', None)

    @property
    def props(self):
        """Reports all the properties of this schema.

        Includes all properties, reserved and non reserved.
        JSON properties of this schema are directly generated from this dict.

        Returns:
          A dictionary of properties associated to this schema.
        """
        return self._props

    @property
    def other_props(self):
        """Returns: the dictionary of non-reserved properties."""
        return dict(filter_keys_out(items=self._props, keys=SCHEMA_RESERVED_PROPS))

    def __str__(self):
        """Returns: the JSON representation of this schema."""
        return json.dumps(self.to_json(names=None))

    # Converts the schema object into its AVRO specification representation.

    # Schema types that have names (records, enums, and fixed) must be aware of not
    # re-defining schemas that are already listed in the parameter names.
    @abc.abstractmethod
    def to_json(self, names):
        ...


# ------------------------------------------------------------------------------


_RE_NAME = re.compile(r'[A-Za-z_][A-Za-z0-9_]*')

_RE_FULL_NAME = re.compile(
    r'^'
    r'[.]?(?:[A-Za-z_][A-Za-z0-9_]*[.])*'  # optional namespace
    r'([A-Za-z_][A-Za-z0-9_]*)'  # name
    r'$'
)


class Name(object):
    """Representation of an Avro name."""

    def __init__(self, name, namespace=None):
        """Parses an Avro name.

        Args:
          name: Avro name to parse (relative or absolute).
          namespace: Optional explicit namespace if the name is relative.
        """
        # Normalize: namespace is always defined as a string, possibly empty.
        if namespace is None:
            namespace = ''

        if '.' in name:
            # name is absolute, namespace is ignored:
            self._fullname = name

            match = _RE_FULL_NAME.match(self._fullname)
            if match is None:
                raise SchemaParseException(
                    f'Invalid absolute schema name: {self._fullname!r}.')

            self._name = match.group(1)
            self._namespace = self._fullname[:-(len(self._name) + 1)]

        else:
            # name is relative, combine with explicit namespace:
            self._name = name
            self._namespace = namespace
            self._fullname = (self._name
                              if (not self._namespace) else
                              f'{self._namespace}.{self._name}')

            # Validate the fullname:
            if _RE_FULL_NAME.match(self._fullname) is None:
                raise SchemaParseException(f"Invalid schema name {self._fullname!r} inferred from "
                                           f"name {self._name!r} and namespace {self._namespace!r}.")

    def __eq__(self, other):
        if not isinstance(other, Name):
            return NotImplemented
        return self.fullname == other.fullname

    @property
    def simple_name(self):
        """Returns: the simple name part of this name."""
        return self._name

    @property
    def namespace(self):
        """Returns: this name's namespace, possible the empty string."""
        return self._namespace

    @property
    def fullname(self):
        """Returns: the full name."""
        return self._fullname


# ------------------------------------------------------------------------------


class Names(object):
    """Tracks Avro named schemas and default namespace during parsing."""

    def __init__(self, default_namespace=None, names=None):
        """Initializes a new name tracker.

        Args:
          default_namespace: Optional default namespace.
          names: Optional initial mapping of known named schemas.
        """
        if names is None:
            names = {}
        self._names = names
        self._default_namespace = default_namespace

    @property
    def names(self):
        """Returns: the mapping of known named schemas."""
        return self._names

    @property
    def default_namespace(self):
        """Returns: the default namespace, if any, or None."""
        return self._default_namespace

    def new_with_default_namespace(self, namespace):
        """Creates a new name tracker from this tracker, but with a new default ns.

        :param Any namespace: New default namespace to use.
        :returns: New name tracker with the specified default namespace.
        :rtype: Names
        """
        return Names(names=self._names, default_namespace=namespace)

    def get_name(self, name, namespace=None):
        """Resolves the Avro name according to this name tracker's state.

        :param Any name: Name to resolve (absolute or relative).
        :param Optional[Any] namespace: Optional explicit namespace.
        :returns: The specified name, resolved according to this tracker.
        :rtype: Name
        """
        if namespace is None:
            namespace = self._default_namespace
        return Name(name=name, namespace=namespace)

    def get_schema(self, name, namespace=None):
        """Resolves an Avro schema by name.

        :param Any name: Name (absolute or relative) of the Avro schema to look up.
        :param Optional[Any] namespace: Optional explicit namespace.
        :returns: The schema with the specified name, if any, or None
        :rtype: Union[Any, None]
        """
        avro_name = self.get_name(name=name, namespace=namespace)
        return self._names.get(avro_name.fullname, None)

    # Given a properties, return properties with namespace removed if it matches the own default namespace
    def prune_namespace(self, properties):
        if self.default_namespace is None:
            # I have no default -- no change
            return properties
        if 'namespace' not in properties:
            # he has no namespace - no change
            return properties
        if properties['namespace'] != self.default_namespace:
            # we're different - leave his stuff alone
            return properties
        # we each have a namespace and it's redundant. delete his.
        prunable = properties.copy()
        del prunable['namespace']
        return prunable

    def register(self, schema):
        """Registers a new named schema in this tracker.

        :param Any schema: Named Avro schema to register in this tracker.
        """
        if schema.fullname in VALID_TYPES:
            raise SchemaParseException(
                f'{schema.fullname} is a reserved type name.')
        if schema.fullname in self.names:
            raise SchemaParseException(
                f'Avro name {schema.fullname!r} already exists.')

        logger.log(DEBUG_VERBOSE, 'Register new name for %r', schema.fullname)
        self._names[schema.fullname] = schema


# ------------------------------------------------------------------------------


class NamedSchema(Schema):
    """Abstract base class for named schemas.

    Named schemas are enumerated in NAMED_TYPES.
    """

    def __init__(
            self,
            data_type,
            name=None,
            namespace=None,
            names=None,
            other_props=None,
    ):
        """Initializes a new named schema object.

        Args:
          data_type: Type of the named schema.
          name: Name (absolute or relative) of the schema.
          namespace: Optional explicit namespace if name is relative.
          names: Tracker to resolve and register Avro names.
          other_props: Optional map of additional properties of the schema.
        """
        assert (data_type in NAMED_TYPES), (f'Invalid named type: {data_type!r}')
        self._avro_name = names.get_name(name=name, namespace=namespace)

        super(NamedSchema, self).__init__(data_type, other_props)

        names.register(self)

        self._props['name'] = self.name
        if self.namespace:
            self._props['namespace'] = self.namespace

    @property
    def avro_name(self):
        """Returns: the Name object describing this schema's name."""
        return self._avro_name

    @property
    def name(self):
        return self._avro_name.simple_name

    @property
    def namespace(self):
        return self._avro_name.namespace

    @property
    def fullname(self):
        return self._avro_name.fullname

    def name_ref(self, names):
        """Reports this schema name relative to the specified name tracker.

        :param Any names: Avro name tracker to relativize this schema name against.
        :returns: This schema name, relativized against the specified name tracker.
        :rtype: Any
        """
        if self.namespace == names.default_namespace:
            return self.name
        return self.fullname

    # Converts the schema object into its AVRO specification representation.

    # Schema types that have names (records, enums, and fixed) must be aware
    # of not re-defining schemas that are already listed in the parameter names.
    @abc.abstractmethod
    def to_json(self, names):
        ...

# ------------------------------------------------------------------------------


_NO_DEFAULT = object()


class Field(object):
    """Representation of the schema of a field in a record."""

    def __init__(
            self,
            data_type,
            name,
            index,
            has_default,
            default=_NO_DEFAULT,
            order=None,
            doc=None,
            other_props=None
    ):
        """Initializes a new Field object.

        Args:
          data_type: Avro schema of the field.
          name: Name of the field.
          index: 0-based position of the field.
          has_default:
          default:
          order:
          doc:
          other_props:
        """
        if (not isinstance(name, str)) or (not name):
            raise SchemaParseException(f'Invalid record field name: {name!r}.')
        if (order is not None) and (order not in VALID_FIELD_SORT_ORDERS):
            raise SchemaParseException(f'Invalid record field order: {order!r}.')

        # All properties of this record field:
        self._props = {}

        self._has_default = has_default
        if other_props:
            self._props.update(other_props)

        self._index = index
        self._type = self._props['type'] = data_type
        self._name = self._props['name'] = name

        if has_default:
            self._props['default'] = default

        if order is not None:
            self._props['order'] = order

        if doc is not None:
            self._props['doc'] = doc

    @property
    def type(self):
        """Returns: the schema of this field."""
        return self._type

    @property
    def name(self):
        """Returns: this field name."""
        return self._name

    @property
    def index(self):
        """Returns: the 0-based index of this field in the record."""
        return self._index

    @property
    def default(self):
        return self._props['default']

    @property
    def has_default(self):
        return self._has_default

    @property
    def order(self):
        return self._props.get('order', None)

    @property
    def doc(self):
        return self._props.get('doc', None)

    @property
    def props(self):
        return self._props

    @property
    def other_props(self):
        return filter_keys_out(items=self._props, keys=FIELD_RESERVED_PROPS)

    def __str__(self):
        return json.dumps(self.to_json())

    def to_json(self, names=None):
        if names is None:
            names = Names()
        to_dump = self.props.copy()
        to_dump['type'] = self.type.to_json(names)
        return to_dump

    def __eq__(self, that):
        to_cmp = json.loads(str(self))
        return to_cmp == json.loads(str(that))


# ------------------------------------------------------------------------------
# Primitive Types


class PrimitiveSchema(Schema):
    """Schema of a primitive Avro type.

    Valid primitive types are defined in PRIMITIVE_TYPES.
    """

    def __init__(self, data_type, other_props=None):
        """Initializes a new schema object for the specified primitive type.

        Args:
          data_type: Type of the schema to construct. Must be primitive.
        """
        if data_type not in PRIMITIVE_TYPES:
            raise AvroException(f'{data_type!r} is not a valid primitive type.')
        super(PrimitiveSchema, self).__init__(data_type, other_props=other_props)

    @property
    def name(self):
        """Returns: the simple name of this schema."""
        # The name of a primitive type is the type itself.
        return self.type

    @property
    def fullname(self):
        """Returns: the fully qualified name of this schema."""
        # The full name is the simple name for primitive schema.
        return self.name

    def to_json(self, names=None):
        if len(self.props) == 1:
            return self.fullname
        return self.props

    def __eq__(self, that):
        return self.props == that.props


# ------------------------------------------------------------------------------
# Complex Types (non-recursive)


class FixedSchema(NamedSchema):
    def __init__(
            self,
            name,
            namespace,
            size,
            names=None,
            other_props=None,
    ):
        # Ensure valid ctor args
        if not isinstance(size, int):
            fail_msg = 'Fixed Schema requires a valid integer for size property.'
            raise AvroException(fail_msg)

        super(FixedSchema, self).__init__(
            data_type=FIXED,
            name=name,
            namespace=namespace,
            names=names,
            other_props=other_props,
        )
        self._props['size'] = size

    @property
    def size(self):
        """Returns: the size of this fixed schema, in bytes."""
        return self._props['size']

    def to_json(self, names=None):
        if names is None:
            names = Names()
        if self.fullname in names.names:
            return self.name_ref(names)
        names.names[self.fullname] = self
        return names.prune_namespace(self.props)

    def __eq__(self, that):
        return self.props == that.props


# ------------------------------------------------------------------------------


class EnumSchema(NamedSchema):
    def __init__(
            self,
            name,
            namespace,
            symbols,
            names=None,
            doc=None,
            other_props=None,
    ):
        """Initializes a new enumeration schema object.

        Args:
          name: Simple name of this enumeration.
          namespace: Optional namespace.
          symbols: Ordered list of symbols defined in this enumeration.
          names:
          doc:
          other_props:
        """
        symbols = tuple(symbols)
        symbol_set = frozenset(symbols)
        if (len(symbol_set) != len(symbols)
                or not all(map(lambda symbol: isinstance(symbol, str), symbols))):
            raise AvroException(
                f'Invalid symbols for enum schema: {symbols!r}.')

        super(EnumSchema, self).__init__(
            data_type=ENUM,
            name=name,
            namespace=namespace,
            names=names,
            other_props=other_props,
        )

        self._props['symbols'] = symbols
        if doc is not None:
            self._props['doc'] = doc

    @property
    def symbols(self):
        """Returns: the symbols defined in this enum."""
        return self._props['symbols']

    def to_json(self, names=None):
        if names is None:
            names = Names()
        if self.fullname in names.names:
            return self.name_ref(names)
        names.names[self.fullname] = self
        return names.prune_namespace(self.props)

    def __eq__(self, that):
        return self.props == that.props


# ------------------------------------------------------------------------------
# Complex Types (recursive)


class ArraySchema(Schema):
    """Schema of an array."""

    def __init__(self, items, other_props=None):
        """Initializes a new array schema object.

        Args:
          items: Avro schema of the array items.
          other_props:
        """
        super(ArraySchema, self).__init__(
            data_type=ARRAY,
            other_props=other_props,
        )
        self._items_schema = items
        self._props['items'] = items

    @property
    def items(self):
        """Returns: the schema of the items in this array."""
        return self._items_schema

    def to_json(self, names=None):
        if names is None:
            names = Names()
        to_dump = self.props.copy()
        item_schema = self.items
        to_dump['items'] = item_schema.to_json(names)
        return to_dump

    def __eq__(self, that):
        to_cmp = json.loads(str(self))
        return to_cmp == json.loads(str(that))


# ------------------------------------------------------------------------------


class MapSchema(Schema):
    """Schema of a map."""

    def __init__(self, values, other_props=None):
        """Initializes a new map schema object.

        Args:
          values: Avro schema of the map values.
          other_props:
        """
        super(MapSchema, self).__init__(
            data_type=MAP,
            other_props=other_props,
        )
        self._values_schema = values
        self._props['values'] = values

    @property
    def values(self):
        """Returns: the schema of the values in this map."""
        return self._values_schema

    def to_json(self, names=None):
        if names is None:
            names = Names()
        to_dump = self.props.copy()
        to_dump['values'] = self.values.to_json(names)
        return to_dump

    def __eq__(self, that):
        to_cmp = json.loads(str(self))
        return to_cmp == json.loads(str(that))


# ------------------------------------------------------------------------------


class UnionSchema(Schema):
    """Schema of a union."""

    def __init__(self, schemas):
        """Initializes a new union schema object.

        Args:
          schemas: Ordered collection of schema branches in the union.
        """
        super(UnionSchema, self).__init__(data_type=UNION)
        self._schemas = tuple(schemas)

        # Validate the schema branches:

        # All named schema names are unique:
        named_branches = tuple(
            filter(lambda schema: schema.type in NAMED_TYPES, self._schemas))
        unique_names = frozenset(map(lambda schema: schema.fullname, named_branches))
        if len(unique_names) != len(named_branches):
            schemas = ''.join(map(lambda schema: (f'\n\t - {schema}'), self._schemas))
            raise AvroException(f'Invalid union branches with duplicate schema name:{schemas}')

        # Types are unique within unnamed schemas, and union is not allowed:
        unnamed_branches = tuple(
            filter(lambda schema: schema.type not in NAMED_TYPES, self._schemas))
        unique_types = frozenset(map(lambda schema: schema.type, unnamed_branches))
        if UNION in unique_types:
            schemas = ''.join(map(lambda schema: (f'\n\t - {schema}'), self._schemas))
            raise AvroException(f'Invalid union branches contain other unions:{schemas}')
        if len(unique_types) != len(unnamed_branches):
            schemas = ''.join(map(lambda schema: (f'\n\t - {schema}'), self._schemas))
            raise AvroException(f'Invalid union branches with duplicate type:{schemas}')

    @property
    def schemas(self):
        """Returns: the ordered list of schema branches in the union."""
        return self._schemas

    def to_json(self, names=None):
        if names is None:
            names = Names()
        to_dump = []
        for schema in self.schemas:
            to_dump.append(schema.to_json(names))
        return to_dump

    def __eq__(self, that):
        to_cmp = json.loads(str(self))
        return to_cmp == json.loads(str(that))


# ------------------------------------------------------------------------------


class ErrorUnionSchema(UnionSchema):
    """Schema representing the declared errors of a protocol message."""

    def __init__(self, schemas):
        """Initializes an error-union schema.

        Args:
          schema: collection of error schema.
        """
        # Prepend "string" to handle system errors
        schemas = [PrimitiveSchema(data_type=STRING)] + list(schemas)
        super(ErrorUnionSchema, self).__init__(schemas=schemas)

    def to_json(self, names=None):
        if names is None:
            names = Names()
        to_dump = []
        for schema in self.schemas:
            # Don't print the system error schema
            if schema.type == STRING:
                continue
            to_dump.append(schema.to_json(names))
        return to_dump


# ------------------------------------------------------------------------------


class RecordSchema(NamedSchema):
    """Schema of a record."""

    @staticmethod
    def _make_field(index, field_desc, names):
        """Builds field schemas from a list of field JSON descriptors.

        :param int index: 0-based index of the field in the record.
        :param Any field_desc: JSON descriptors of a record field.
        :param Any names: The names for this schema.
        :returns: The field schema.
        :rtype: Field
        """
        field_schema = schema_from_json_data(
            json_data=field_desc['type'],
            names=names,
        )
        other_props = (
            dict(filter_keys_out(items=field_desc, keys=FIELD_RESERVED_PROPS)))
        return Field(
            data_type=field_schema,
            name=field_desc['name'],
            index=index,
            has_default=('default' in field_desc),
            default=field_desc.get('default', _NO_DEFAULT),
            order=field_desc.get('order', None),
            doc=field_desc.get('doc', None),
            other_props=other_props,
        )

    @staticmethod
    def make_field_list(field_desc_list, names):
        """Builds field schemas from a list of field JSON descriptors.
        Guarantees field name unicity.

        :param Any field_desc_list: Collection of field JSON descriptors.
        :param Any names: The names for this schema.
        :returns: Field schemas.
        :rtype: Field
        """
        for index, field_desc in enumerate(field_desc_list):
            yield RecordSchema._make_field(index, field_desc, names)

    @staticmethod
    def _make_field_map(fields):
        """Builds the field map.
        Guarantees field name unicity.

        :param Any fields: Iterable of field schema.
        :returns: A map of field schemas, indexed by name.
        :rtype: Dict[Any, Any]
        """
        field_map = {}
        for field in fields:
            if field.name in field_map:
                raise SchemaParseException(
                    f'Duplicate record field name {field.name!r}.')
            field_map[field.name] = field
        return field_map

    def __init__(
            self,
            name,
            namespace,
            fields=None,
            make_fields=None,
            names=None,
            record_type=RECORD,
            doc=None,
            other_props=None
    ):
        """Initializes a new record schema object.

        Args:
          name: Name of the record (absolute or relative).
          namespace: Optional namespace the record belongs to, if name is relative.
          fields: collection of fields to add to this record.
              Exactly one of fields or make_fields must be specified.
          make_fields: function creating the fields that belong to the record.
              The function signature is: make_fields(names) -> ordered field list.
              Exactly one of fields or make_fields must be specified.
          names:
          record_type: Type of the record: one of RECORD, ERROR or REQUEST.
              Protocol requests are not named.
          doc:
          other_props:
        """
        if record_type == REQUEST:
            # Protocol requests are not named:
            super(RecordSchema, self).__init__(
                data_type=REQUEST,
                other_props=other_props,
            )
        elif record_type in [RECORD, ERROR]:
            # Register this record name in the tracker:
            super(RecordSchema, self).__init__(
                data_type=record_type,
                name=name,
                namespace=namespace,
                names=names,
                other_props=other_props,
            )
        else:
            raise SchemaParseException(
                f'Invalid record type: {record_type!r}.')

        nested_names = []
        if record_type in [RECORD, ERROR]:
            avro_name = names.get_name(name=name, namespace=namespace)
            nested_names = names.new_with_default_namespace(namespace=avro_name.namespace)
        elif record_type == REQUEST:
            # Protocol request has no name: no need to change default namespace:
            nested_names = names

        if fields is None:
            fields = make_fields(names=nested_names)
        else:
            assert make_fields is None
        self._fields = tuple(fields)

        self._field_map = RecordSchema._make_field_map(self._fields)

        self._props['fields'] = fields
        if doc is not None:
            self._props['doc'] = doc

    @property
    def fields(self):
        """Returns: the field schemas, as an ordered tuple."""
        return self._fields

    @property
    def field_map(self):
        """Returns: a read-only map of the field schemas index by field names."""
        return self._field_map

    def to_json(self, names=None):
        if names is None:
            names = Names()
        # Request records don't have names
        if self.type == REQUEST:
            return [f.to_json(names) for f in self.fields]

        if self.fullname in names.names:
            return self.name_ref(names)
        names.names[self.fullname] = self

        to_dump = names.prune_namespace(self.props.copy())
        to_dump['fields'] = [f.to_json(names) for f in self.fields]
        return to_dump

    def __eq__(self, that):
        to_cmp = json.loads(str(self))
        return to_cmp == json.loads(str(that))


# ------------------------------------------------------------------------------
# Module functions


def filter_keys_out(items, keys):
    """Filters a collection of (key, value) items.
    Exclude any item whose key belongs to keys.

    :param Dict[Any, Any] items: Dictionary of items to filter the keys out of.
    :param Dict[Any, Any] keys: Dictionary of keys to filter the extracted keys against.
    :returns: Filtered items.
    :rtype: Tuple(Any, Any)
    """
    for key, value in items.items():
        if key in keys:
            continue
        yield key, value


# ------------------------------------------------------------------------------


def _schema_from_json_string(json_string, names):
    if json_string in PRIMITIVE_TYPES:
        return PrimitiveSchema(data_type=json_string)

    # Look for a known named schema:
    schema = names.get_schema(name=json_string)
    if schema is None:
        raise SchemaParseException(f"Unknown named schema {json_string!r}, known names: {sorted(names.names)!r}.")
    return schema


def _schema_from_json_array(json_array, names):
    def MakeSchema(desc):
        return schema_from_json_data(json_data=desc, names=names)

    return UnionSchema(map(MakeSchema, json_array))


def _schema_from_json_object(json_object, names):
    data_type = json_object.get('type')
    if data_type is None:
        raise SchemaParseException(
            f'Avro schema JSON descriptor has no "type" property: {json_object!r}')

    other_props = dict(
        filter_keys_out(items=json_object, keys=SCHEMA_RESERVED_PROPS))

    if data_type in PRIMITIVE_TYPES:
        # FIXME should not ignore other properties
        result = PrimitiveSchema(data_type, other_props=other_props)

    elif data_type in NAMED_TYPES:
        name = json_object.get('name')
        namespace = json_object.get('namespace', names.default_namespace)
        if data_type == FIXED:
            size = json_object.get('size')
            result = FixedSchema(name, namespace, size, names, other_props)
        elif data_type == ENUM:
            symbols = json_object.get('symbols')
            doc = json_object.get('doc')
            result = EnumSchema(name, namespace, symbols, names, doc, other_props)

        elif data_type in [RECORD, ERROR]:
            field_desc_list = json_object.get('fields', ())

            def MakeFields(names):
                return tuple(RecordSchema.make_field_list(field_desc_list, names))

            result = RecordSchema(
                name=name,
                namespace=namespace,
                make_fields=MakeFields,
                names=names,
                record_type=data_type,
                doc=json_object.get('doc'),
                other_props=other_props,
            )
        else:
            raise ValueError(f'Internal error: unknown type {data_type!r}.')

    elif data_type in VALID_TYPES:
        # Unnamed, non-primitive Avro type:

        if data_type == ARRAY:
            items_desc = json_object.get('items')
            if items_desc is None:
                raise SchemaParseException(f'Invalid array schema descriptor with no "items" : {json_object!r}.')
            result = ArraySchema(
                items=schema_from_json_data(items_desc, names),
                other_props=other_props,
            )

        elif data_type == MAP:
            values_desc = json_object.get('values')
            if values_desc is None:
                raise SchemaParseException(f'Invalid map schema descriptor with no "values" : {json_object!r}.')
            result = MapSchema(
                values=schema_from_json_data(values_desc, names=names),
                other_props=other_props,
            )

        elif data_type == ERROR_UNION:
            error_desc_list = json_object.get('declared_errors')
            assert error_desc_list is not None
            error_schemas = map(
                lambda desc: schema_from_json_data(desc, names=names),
                error_desc_list)
            result = ErrorUnionSchema(schemas=error_schemas)

        else:
            raise ValueError(f'Internal error: unknown type {data_type!r}.')
    else:
        raise SchemaParseException(f'Invalid JSON descriptor for an Avro schema: {json_object!r}')
    return result


# Parsers for the JSON data types:
_JSONDataParserTypeMap = {
    str: _schema_from_json_string,
    list: _schema_from_json_array,
    dict: _schema_from_json_object,
}


def schema_from_json_data(json_data, names=None):
    """Builds an Avro Schema from its JSON descriptor.
    Raises SchemaParseException if the descriptor is invalid.

    :param Any json_data: JSON data representing the descriptor of the Avro schema.
    :param Any names: Optional tracker for Avro named schemas.
    :returns: The Avro schema parsed from the JSON descriptor.
    :rtype: Any
    """
    if names is None:
        names = Names()

    # Select the appropriate parser based on the JSON data type:
    parser = _JSONDataParserTypeMap.get(type(json_data))
    if parser is None:
        raise SchemaParseException(
            f'Invalid JSON descriptor for an Avro schema: {json_data!r}.')
    return parser(json_data, names=names)


# ------------------------------------------------------------------------------


def parse(json_string):
    """Constructs a Schema from its JSON descriptor in text form.
    Raises SchemaParseException if a JSON parsing error is met, or if the JSON descriptor is invalid.

    :param str json_string: String representation of the JSON descriptor of the schema.
    :returns: The parsed schema.
    :rtype: Any
    """
    try:
        json_data = json.loads(json_string)
    except Exception as exn:
        raise SchemaParseException(
            f'Error parsing schema from JSON: {json_string!r}. '
            f'Error message: {exn!r}.') from exn

    # Initialize the names object
    names = Names()

    # construct the Avro Schema object
    return schema_from_json_data(json_data, names)
