feast_spark.pyspark.launchers.aws package

Submodules

feast_spark.pyspark.launchers.aws.emr module

class feast_spark.pyspark.launchers.aws.emr.EmrBatchIngestionJob(emr_client, job_ref: EmrJobRef, project: str, table_name: str)[source]

Bases: EmrJobMixin, BatchIngestionJob

Ingestion job result for a EMR cluster

get_feature_table() str[source]

Get the feature table name associated with this job. Return empty string if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

get_project() str[source]
class feast_spark.pyspark.launchers.aws.emr.EmrClusterLauncher(*, region: str, existing_cluster_id: Optional[str], new_cluster_template_path: Optional[str], staging_location: str, emr_log_location: str)[source]

Bases: JobLauncher

Submits jobs to an existing or new EMR cluster. Requires boto3 as an additional dependency.

get_job_by_id(job_id: str) SparkJob[source]

Find EMR job by a string id. Note that it will also return terminated jobs.

Raises

KeyError if the job not found.

historical_feature_retrieval(job_params: RetrievalJobParameters) RetrievalJob[source]

Submits a historical feature retrieval job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that returns file uri to the result file.

Return type

RetrievalJob

list_jobs(include_terminated: bool, project: Optional[str] = None, table_name: Optional[str] = None) List[SparkJob][source]

Find EMR job by a string id.

Parameters
  • include_terminated – whether to include terminated jobs.

  • table_name – FeatureTable name to filter by

Returns

A list of SparkJob instances.

offline_to_online_ingestion(ingestion_job_params: BatchIngestionJobParameters) BatchIngestionJob[source]

Submits a batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

BatchIngestionJob

schedule_offline_to_online_ingestion(ingestion_job_params: ScheduledBatchIngestionJobParameters)[source]

Submits a scheduled batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

ScheduledBatchIngestionJob

start_stream_to_online_ingestion(ingestion_job_params: StreamIngestionJobParameters) StreamIngestionJob[source]

Starts a stream ingestion job on a Spark cluster.

Returns

wrapper around remote job that can be used to check on the job.

Return type

StreamIngestionJob

unschedule_offline_to_online_ingestion(project: str, feature_table: str)[source]

Unschedule a scheduled batch ingestion job.

class feast_spark.pyspark.launchers.aws.emr.EmrJobMixin(emr_client, job_ref: EmrJobRef)[source]

Bases: object

cancel()[source]
get_id() str[source]
get_start_time() datetime[source]
get_status() SparkJobStatus[source]
class feast_spark.pyspark.launchers.aws.emr.EmrRetrievalJob(emr_client, job_ref: EmrJobRef, output_file_uri: str)[source]

Bases: EmrJobMixin, RetrievalJob

Historical feature retrieval job result for a EMR cluster

get_output_file_uri(timeout_sec=None, block=True)[source]

Get output file uri to the result file. This method will block until the job succeeded, or if the job didn’t execute successfully within timeout.

Parameters
  • timeout_sec (int) – Max no of seconds to wait until job is done. If “timeout_sec” is exceeded or if the job fails, an exception will be raised.

  • block (bool) – If false, don’t block until the job is done. If block=True, timeout parameter is ignored.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

file uri to the result file.

Return type

str

class feast_spark.pyspark.launchers.aws.emr.EmrStreamIngestionJob(emr_client, job_ref: EmrJobRef, job_hash: str, project: str, table_name: str)[source]

Bases: EmrJobMixin, StreamIngestionJob

Ingestion streaming job for a EMR cluster

get_feature_table() str[source]

Get the feature table name associated with this job. Return None if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

get_hash() str[source]

Gets the consistent hash of this stream ingestion job.

The hash needs to be persisted at the data processing layer, so that we can get the same hash when retrieving the job from Spark.

Returns

The hash for this streaming ingestion job

Return type

str

feast_spark.pyspark.launchers.aws.emr_utils module

class feast_spark.pyspark.launchers.aws.emr_utils.EmrJobRef(cluster_id: str, step_id: Optional[str])[source]

Bases: tuple

EMR job reference. step_id can be None when using on-demand clusters, in that case each cluster has only one step

property cluster_id

Alias for field number 0

property step_id

Alias for field number 1

class feast_spark.pyspark.launchers.aws.emr_utils.JobInfo(job_ref, job_type, state, project, table_name, output_file_uri, job_hash)[source]

Bases: tuple

property job_hash

Alias for field number 6

property job_ref

Alias for field number 0

property job_type

Alias for field number 1

property output_file_uri

Alias for field number 5

property project

Alias for field number 3

property state

Alias for field number 2

property table_name

Alias for field number 4

Module contents

class feast_spark.pyspark.launchers.aws.EmrBatchIngestionJob(emr_client, job_ref: EmrJobRef, project: str, table_name: str)[source]

Bases: EmrJobMixin, BatchIngestionJob

Ingestion job result for a EMR cluster

get_feature_table() str[source]

Get the feature table name associated with this job. Return empty string if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

get_project() str[source]
class feast_spark.pyspark.launchers.aws.EmrClusterLauncher(*, region: str, existing_cluster_id: Optional[str], new_cluster_template_path: Optional[str], staging_location: str, emr_log_location: str)[source]

Bases: JobLauncher

Submits jobs to an existing or new EMR cluster. Requires boto3 as an additional dependency.

get_job_by_id(job_id: str) SparkJob[source]

Find EMR job by a string id. Note that it will also return terminated jobs.

Raises

KeyError if the job not found.

historical_feature_retrieval(job_params: RetrievalJobParameters) RetrievalJob[source]

Submits a historical feature retrieval job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that returns file uri to the result file.

Return type

RetrievalJob

list_jobs(include_terminated: bool, project: Optional[str] = None, table_name: Optional[str] = None) List[SparkJob][source]

Find EMR job by a string id.

Parameters
  • include_terminated – whether to include terminated jobs.

  • table_name – FeatureTable name to filter by

Returns

A list of SparkJob instances.

offline_to_online_ingestion(ingestion_job_params: BatchIngestionJobParameters) BatchIngestionJob[source]

Submits a batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

BatchIngestionJob

schedule_offline_to_online_ingestion(ingestion_job_params: ScheduledBatchIngestionJobParameters)[source]

Submits a scheduled batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

ScheduledBatchIngestionJob

start_stream_to_online_ingestion(ingestion_job_params: StreamIngestionJobParameters) StreamIngestionJob[source]

Starts a stream ingestion job on a Spark cluster.

Returns

wrapper around remote job that can be used to check on the job.

Return type

StreamIngestionJob

unschedule_offline_to_online_ingestion(project: str, feature_table: str)[source]

Unschedule a scheduled batch ingestion job.

class feast_spark.pyspark.launchers.aws.EmrRetrievalJob(emr_client, job_ref: EmrJobRef, output_file_uri: str)[source]

Bases: EmrJobMixin, RetrievalJob

Historical feature retrieval job result for a EMR cluster

get_output_file_uri(timeout_sec=None, block=True)[source]

Get output file uri to the result file. This method will block until the job succeeded, or if the job didn’t execute successfully within timeout.

Parameters
  • timeout_sec (int) – Max no of seconds to wait until job is done. If “timeout_sec” is exceeded or if the job fails, an exception will be raised.

  • block (bool) – If false, don’t block until the job is done. If block=True, timeout parameter is ignored.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

file uri to the result file.

Return type

str

class feast_spark.pyspark.launchers.aws.EmrStreamIngestionJob(emr_client, job_ref: EmrJobRef, job_hash: str, project: str, table_name: str)[source]

Bases: EmrJobMixin, StreamIngestionJob

Ingestion streaming job for a EMR cluster

get_feature_table() str[source]

Get the feature table name associated with this job. Return None if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

get_hash() str[source]

Gets the consistent hash of this stream ingestion job.

The hash needs to be persisted at the data processing layer, so that we can get the same hash when retrieving the job from Spark.

Returns

The hash for this streaming ingestion job

Return type

str