The Kafka emitter or Rest emitter can be used to push metadata to DataHub. The DataHub graph client extends the Rest emitter with additional functionality.

class datahub.emitter.rest_emitter.DataHubRestEmitter(gms_server, token=None, connect_timeout_sec=None, read_timeout_sec=None, retry_status_codes=None, retry_methods=None, retry_max_times=None, extra_headers=None, ca_certificate_path=None, server_telemetry_id=None, disable_ssl_verification=False)

Bases: Closeable

Parameters:
  • gms_server (str)

  • token (Optional[str])

  • connect_timeout_sec (Optional[float])

  • read_timeout_sec (Optional[float])

  • retry_status_codes (Optional[List[int]])

  • retry_methods (Optional[List[str]])

  • retry_max_times (Optional[int])

  • extra_headers (Optional[Dict[str, str]])

  • ca_certificate_path (Optional[str])

  • server_telemetry_id (Optional[str])

  • disable_ssl_verification (bool)

test_connection()
Return type:

dict

emit(item, callback=None)
Parameters:
Return type:

Tuple[datetime, datetime]

emit_mce(mce)
Parameters:

mce (MetadataChangeEventClass)

Return type:

None

emit_mcp(mcp)
Parameters:

mcp (Union[MetadataChangeProposalClass, MetadataChangeProposalWrapper])

Return type:

None

emit_usage(usageStats)
Parameters:

usageStats (UsageAggregationClass)

Return type:

None

close()
Return type:

None

datahub.emitter.rest_emitter.DatahubRestEmitter

alias of DataHubRestEmitter

class datahub.emitter.kafka_emitter.KafkaEmitterConfig(**data)

Bases: ConfigModel

Parameters:
  • data (Any)

  • connection (KafkaProducerConnectionConfig)

  • topic_routes (Dict[str, str])

connection: KafkaProducerConnectionConfig
topic_routes: Dict[str, str]
classmethod validate_topic_routes(v)
Parameters:

v (Dict[str, str])

Return type:

Dict[str, str]

class datahub.emitter.kafka_emitter.DatahubKafkaEmitter(config)

Bases: Closeable

Parameters:

config (KafkaEmitterConfig)

emit(item, callback=None)
Parameters:
Return type:

None

emit_mce_async(mce, callback)
Parameters:
Return type:

None

emit_mcp_async(mcp, callback)
Parameters:
Return type:

None

flush()
Return type:

None

close()
Return type:

None

class datahub.ingestion.graph.client.DatahubClientConfig(**data)

Bases: ConfigModel

Configuration class for holding connectivity to datahub gms

Parameters:
  • data (Any)

  • server (str)

  • token (str | None)

  • timeout_sec (int | None)

  • retry_status_codes (List[int] | None)

  • retry_max_times (int | None)

  • extra_headers (Dict[str, str] | None)

  • ca_certificate_path (str | None)

  • max_threads (int)

  • disable_ssl_verification (bool)

server: str
token: Optional[str]
timeout_sec: Optional[int]
retry_status_codes: Optional[List[int]]
retry_max_times: Optional[int]
extra_headers: Optional[Dict[str, str]]
ca_certificate_path: Optional[str]
max_threads: int
disable_ssl_verification: bool
datahub.ingestion.graph.client.DataHubGraphConfig

alias of DatahubClientConfig

class datahub.ingestion.graph.client.DataHubGraph(config)

Bases: DataHubRestEmitter

Parameters:

config (DatahubClientConfig)

get_aspect(entity_urn, aspect_type, version=0)

Get an aspect for an entity.

Parameters:
  • entity_urn (str) – The urn of the entity

  • aspect_type (Type[TypeVar(Aspect, bound= _Aspect)]) – The type class of the aspect being requested (e.g. datahub.metadata.schema_classes.DatasetProperties)

  • version (int) – The version of the aspect to retrieve. The default of 0 means latest. Versions > 0 go from oldest to newest, so 1 is the oldest.

Return type:

Optional[TypeVar(Aspect, bound= _Aspect)]

Returns:

the Aspect as a dictionary if present, None if no aspect was found (HTTP status 404)

Raises:
  • TypeError – if the aspect type is a timeseries aspect

  • HttpError – if the HTTP response is not a 200 or a 404

get_aspect_v2(entity_urn, aspect_type, aspect, aspect_type_name=None, version=0)
Parameters:
  • entity_urn (str)

  • aspect_type (Type[TypeVar(Aspect, bound= _Aspect)])

  • aspect (str)

  • aspect_type_name (Optional[str])

  • version (int)

Return type:

Optional[TypeVar(Aspect, bound= _Aspect)]

get_config()
Return type:

Dict[str, Any]

get_ownership(entity_urn)
Parameters:

entity_urn (str)

Return type:

Optional[OwnershipClass]

get_schema_metadata(entity_urn)
Parameters:

entity_urn (str)

Return type:

Optional[SchemaMetadataClass]

get_domain_properties(entity_urn)
Parameters:

entity_urn (str)

Return type:

Optional[DomainPropertiesClass]

get_dataset_properties(entity_urn)
Parameters:

entity_urn (str)

Return type:

Optional[DatasetPropertiesClass]

get_tags(entity_urn)
Parameters:

entity_urn (str)

Return type:

Optional[GlobalTagsClass]

get_glossary_terms(entity_urn)
Parameters:

entity_urn (str)

Return type:

Optional[GlossaryTermsClass]

get_domain(entity_urn)
Parameters:

entity_urn (str)

Return type:

Optional[DomainsClass]

get_browse_path(entity_urn)
Parameters:

entity_urn (str)

Return type:

Optional[BrowsePathsClass]

get_usage_aspects_from_urn(entity_urn, start_timestamp, end_timestamp)
Parameters:
  • entity_urn (str)

  • start_timestamp (int)

  • end_timestamp (int)

Return type:

Optional[List[DatasetUsageStatisticsClass]]

list_all_entity_urns(entity_type, start, count)
Parameters:
  • entity_type (str)

  • start (int)

  • count (int)

Return type:

Optional[List[str]]

get_latest_timeseries_value(entity_urn, aspect_type, filter_criteria_map)
Parameters:
  • entity_urn (str)

  • aspect_type (Type[TypeVar(Aspect, bound= _Aspect)])

  • filter_criteria_map (Dict[str, str])

Return type:

Optional[TypeVar(Aspect, bound= _Aspect)]

get_entity_raw(entity_urn, aspects=None)
Parameters:
  • entity_urn (str)

  • aspects (Optional[List[str]])

Return type:

Dict

get_aspects_for_entity(entity_urn, aspects, aspect_types)

Get multiple aspects for an entity. To get a single aspect for an entity, use the get_aspect_v2 method. Warning: Do not use this method to determine if an entity exists! This method will always return an entity, even if it doesn’t exist. This is an issue with how DataHub server responds to these calls, and will be fixed automatically when the server-side issue is fixed.

Parameters:
  • entity_urn (str) – The urn of the entity

  • aspect_type_list (List[Type[Aspect]]) – List of aspect type classes being requested (e.g. [datahub.metadata.schema_classes.DatasetProperties])

  • aspects_list (List[str]) – List of aspect names being requested (e.g. [schemaMetadata, datasetProperties])

  • entity_urn

  • aspects (List[str])

  • aspect_types (List[Type[TypeVar(Aspect, bound= _Aspect)]])

Return type:

Dict[str, Optional[TypeVar(Aspect, bound= _Aspect)]]

Returns:

Optionally, a map of aspect_name to aspect_value as a dictionary if present, aspect_value will be set to None if that aspect was not found. Returns None on HTTP status 404.

Raises:

HttpError – if the HTTP response is not a 200

get_domain_urn_by_name(domain_name)

Retrieve a domain urn based on its name. Returns None if there is no match found

Parameters:

domain_name (str)

Return type:

Optional[str]

get_container_urns_by_filter(env=None, search_query='*')

Return container urns that match based on query

Parameters:
  • env (Optional[str])

  • search_query (str)

Return type:

Iterable[str]

get_urns_by_filter(platform, batch_size=10000)
Parameters:
  • platform (str)

  • batch_size (int)

Return type:

Iterable[str]

get_latest_pipeline_checkpoint(pipeline_name, platform)
Parameters:
  • pipeline_name (str)

  • platform (str)

Return type:

Optional[Checkpoint[GenericCheckpointState]]

get_search_results(start=0, count=1, entity='dataset')
Parameters:
  • start (int)

  • count (int)

  • entity (str)

Return type:

Dict

get_aspect_counts(aspect, urn_like=None)
Parameters:
  • aspect (str)

  • urn_like (Optional[str])

Return type:

int

execute_graphql(query, variables=None)
Parameters:
  • query (str)

  • variables (Optional[Dict])

Return type:

Dict

class RelationshipDirection(value)

Bases: str, Enum

An enumeration.

INCOMING = 'INCOMING'
OUTGOING = 'OUTGOING'
class RelatedEntity(urn, relationship_type)

Bases: object

Parameters:
  • urn (str)

  • relationship_type (str)

urn: str
relationship_type: str
Parameters:
Return type:

Iterable[RelatedEntity]

soft_delete_urn(urn, run_id='soft-delete-urns')
Parameters:
  • urn (str)

  • run_id (str)

Return type:

None

datahub.ingestion.graph.client.get_default_graph()
Return type:

DataHubGraph