feast_spark.pyspark.launchers.gcloud package

Submodules

feast_spark.pyspark.launchers.gcloud.dataproc module

class feast_spark.pyspark.launchers.gcloud.dataproc.DataprocBatchIngestionJob(job: Job, refresh_fn: Callable[[], Job], cancel_fn: Callable[[], None], project: str, region: str)[source]

Bases: DataprocJobMixin, BatchIngestionJob

Batch Ingestion job result for a Dataproc cluster

get_feature_table() str[source]

Get the feature table name associated with this job. Return empty string if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

class feast_spark.pyspark.launchers.gcloud.dataproc.DataprocClusterLauncher(cluster_name: str, staging_location: str, region: str, project_id: str, executor_instances: str, executor_cores: str, executor_memory: str)[source]

Bases: JobLauncher

Submits jobs to an existing Dataproc cluster. Depends on google-cloud-dataproc and google-cloud-storage, which are optional dependencies that the user has to installed in addition to the Feast SDK.

EXTERNAL_JARS = ['gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar']
FEATURE_TABLE_LABEL_KEY = 'feast_feature_tables'
JOB_HASH_LABEL_KEY = 'feast_job_hash'
JOB_TYPE_LABEL_KEY = 'feast_job_type'
PROJECT_LABEL_KEY = 'feast_project'
dataproc_cancel(job_id)[source]
dataproc_submit(job_params: SparkJobParameters, extra_properties: Dict[str, str]) Tuple[Job, Callable[[], Job], Callable[[], None]][source]
get_job_by_id(job_id: str) SparkJob[source]
historical_feature_retrieval(job_params: RetrievalJobParameters) RetrievalJob[source]

Submits a historical feature retrieval job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that returns file uri to the result file.

Return type

RetrievalJob

list_jobs(include_terminated: bool, project: Optional[str] = None, table_name: Optional[str] = None) List[SparkJob][source]
offline_to_online_ingestion(ingestion_job_params: BatchIngestionJobParameters) BatchIngestionJob[source]

Submits a batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

BatchIngestionJob

schedule_offline_to_online_ingestion(ingestion_job_params: ScheduledBatchIngestionJobParameters)[source]

Submits a scheduled batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

ScheduledBatchIngestionJob

start_stream_to_online_ingestion(ingestion_job_params: StreamIngestionJobParameters) StreamIngestionJob[source]

Starts a stream ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job.

Return type

StreamIngestionJob

unschedule_offline_to_online_ingestion(project: str, feature_table: str)[source]

Unschedule a scheduled batch ingestion job.

class feast_spark.pyspark.launchers.gcloud.dataproc.DataprocJobMixin(job: Job, refresh_fn: Callable[[], Job], cancel_fn: Callable[[], None], project: str, region: str)[source]

Bases: object

block_polling(interval_sec=30, timeout_sec=3600) SparkJobStatus[source]

Blocks until the Dataproc job is completed or failed.

Parameters
  • interval_sec (int) – Polling interval.

  • timeout_sec (int) – Timeout limit.

Returns

Latest job status

Return type

SparkJobStatus

Raises

SparkJobFailure – Raise error if the job neither failed nor completed within the timeout limit.

cancel()[source]

Manually terminate job

get_error_message() Optional[str][source]

Getter for the job’s error message if applicable.

Returns

Status detail of the job. Return None if the job is successful.

Return type

str

get_id() str[source]

Getter for the job id.

Returns

Dataproc job id.

Return type

str

get_log_uri() Optional[str][source]
get_start_time()[source]
get_status() SparkJobStatus[source]

Job Status retrieval

Returns

Job status

Return type

SparkJobStatus

class feast_spark.pyspark.launchers.gcloud.dataproc.DataprocRetrievalJob(job: Job, refresh_fn: Callable[[], Job], cancel_fn: Callable[[], None], project: str, region: str, output_file_uri: str)[source]

Bases: DataprocJobMixin, RetrievalJob

Historical feature retrieval job result for a Dataproc cluster

get_output_file_uri(timeout_sec=None, block=True)[source]

Get output file uri to the result file. This method will block until the job succeeded, or if the job didn’t execute successfully within timeout.

Parameters
  • timeout_sec (int) – Max no of seconds to wait until job is done. If “timeout_sec” is exceeded or if the job fails, an exception will be raised.

  • block (bool) – If false, don’t block until the job is done. If block=True, timeout parameter is ignored.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

file uri to the result file.

Return type

str

class feast_spark.pyspark.launchers.gcloud.dataproc.DataprocStreamingIngestionJob(job: Job, refresh_fn: Callable[[], Job], cancel_fn: Callable[[], None], project: str, region: str, job_hash: str)[source]

Bases: DataprocJobMixin, StreamIngestionJob

Streaming Ingestion job result for a Dataproc cluster

get_feature_table() str[source]

Get the feature table name associated with this job. Return None if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

get_hash() str[source]

Gets the consistent hash of this stream ingestion job.

The hash needs to be persisted at the data processing layer, so that we can get the same hash when retrieving the job from Spark.

Returns

The hash for this streaming ingestion job

Return type

str

Module contents

class feast_spark.pyspark.launchers.gcloud.DataprocClusterLauncher(cluster_name: str, staging_location: str, region: str, project_id: str, executor_instances: str, executor_cores: str, executor_memory: str)[source]

Bases: JobLauncher

Submits jobs to an existing Dataproc cluster. Depends on google-cloud-dataproc and google-cloud-storage, which are optional dependencies that the user has to installed in addition to the Feast SDK.

EXTERNAL_JARS = ['gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar']
FEATURE_TABLE_LABEL_KEY = 'feast_feature_tables'
JOB_HASH_LABEL_KEY = 'feast_job_hash'
JOB_TYPE_LABEL_KEY = 'feast_job_type'
PROJECT_LABEL_KEY = 'feast_project'
dataproc_cancel(job_id)[source]
dataproc_submit(job_params: SparkJobParameters, extra_properties: Dict[str, str]) Tuple[Job, Callable[[], Job], Callable[[], None]][source]
get_job_by_id(job_id: str) SparkJob[source]
historical_feature_retrieval(job_params: RetrievalJobParameters) RetrievalJob[source]

Submits a historical feature retrieval job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that returns file uri to the result file.

Return type

RetrievalJob

list_jobs(include_terminated: bool, project: Optional[str] = None, table_name: Optional[str] = None) List[SparkJob][source]
offline_to_online_ingestion(ingestion_job_params: BatchIngestionJobParameters) BatchIngestionJob[source]

Submits a batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

BatchIngestionJob

schedule_offline_to_online_ingestion(ingestion_job_params: ScheduledBatchIngestionJobParameters)[source]

Submits a scheduled batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

ScheduledBatchIngestionJob

start_stream_to_online_ingestion(ingestion_job_params: StreamIngestionJobParameters) StreamIngestionJob[source]

Starts a stream ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job.

Return type

StreamIngestionJob

unschedule_offline_to_online_ingestion(project: str, feature_table: str)[source]

Unschedule a scheduled batch ingestion job.

class feast_spark.pyspark.launchers.gcloud.DataprocRetrievalJob(job: Job, refresh_fn: Callable[[], Job], cancel_fn: Callable[[], None], project: str, region: str, output_file_uri: str)[source]

Bases: DataprocJobMixin, RetrievalJob

Historical feature retrieval job result for a Dataproc cluster

get_output_file_uri(timeout_sec=None, block=True)[source]

Get output file uri to the result file. This method will block until the job succeeded, or if the job didn’t execute successfully within timeout.

Parameters
  • timeout_sec (int) – Max no of seconds to wait until job is done. If “timeout_sec” is exceeded or if the job fails, an exception will be raised.

  • block (bool) – If false, don’t block until the job is done. If block=True, timeout parameter is ignored.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

file uri to the result file.

Return type

str