feast_spark.pyspark.launchers.standalone package

Submodules

feast_spark.pyspark.launchers.standalone.local module

class feast_spark.pyspark.launchers.standalone.local.JobCache[source]

Bases: object

A global in-memory cache of Spark jobs.

This is necessary since we can’t easily keep track of running Spark jobs in local mode, since there is no external state (unlike EMR and Dataproc which keep track of the running jobs for us).

add_job(job: SparkJob) None[source]

Add a Spark job to the cache.

Parameters

job (SparkJob) – The new Spark job to add.

get_job_by_id(job_id: str) SparkJob[source]

Get a Spark job with the given ID. Throws an exception if such job doesn’t exist.

Parameters

job_id (str) – External ID of the Spark job to get.

Returns

The Spark job with the given ID.

Return type

SparkJob

hash_by_id: Dict[str, Optional[str]]
job_by_id: Dict[str, SparkJob]
list_jobs() List[SparkJob][source]

List all Spark jobs in the cache.

lock: RLock
class feast_spark.pyspark.launchers.standalone.local.StandaloneClusterBatchIngestionJob(job_id: str, job_name: str, process: Popen, ui_port: int, feature_table: str)[source]

Bases: StandaloneClusterJobMixin, BatchIngestionJob

Batch Ingestion job result for a standalone spark cluster

get_feature_table() str[source]

Get the feature table name associated with this job. Return empty string if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

class feast_spark.pyspark.launchers.standalone.local.StandaloneClusterJobMixin(job_id: str, job_name: str, process: Popen, ui_port: Optional[int] = None)[source]

Bases: object

cancel()[source]
check_if_started()[source]
get_id() str[source]
get_start_time() datetime[source]
get_status() SparkJobStatus[source]
class feast_spark.pyspark.launchers.standalone.local.StandaloneClusterLauncher(master_url: str, spark_home: Optional[str] = None)[source]

Bases: JobLauncher

Submits jobs to a standalone Spark cluster in client mode.

get_job_by_id(job_id: str) SparkJob[source]
historical_feature_retrieval(job_params: RetrievalJobParameters) RetrievalJob[source]

Submits a historical feature retrieval job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that returns file uri to the result file.

Return type

RetrievalJob

list_jobs(include_terminated: bool, project: Optional[str], table_name: Optional[str]) List[SparkJob][source]
offline_to_online_ingestion(ingestion_job_params: BatchIngestionJobParameters) BatchIngestionJob[source]

Submits a batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

BatchIngestionJob

schedule_offline_to_online_ingestion(ingestion_job_params: ScheduledBatchIngestionJobParameters)[source]

Submits a scheduled batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

ScheduledBatchIngestionJob

spark_submit(job_params: SparkJobParameters, ui_port: Optional[int] = None) Popen[source]
property spark_submit_script_path
start_stream_to_online_ingestion(ingestion_job_params: StreamIngestionJobParameters) StreamIngestionJob[source]

Starts a stream ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job.

Return type

StreamIngestionJob

unschedule_offline_to_online_ingestion(project: str, feature_table: str)[source]

Unschedule a scheduled batch ingestion job.

class feast_spark.pyspark.launchers.standalone.local.StandaloneClusterRetrievalJob(job_id: str, job_name: str, process: Popen, output_file_uri: str)[source]

Bases: StandaloneClusterJobMixin, RetrievalJob

Historical feature retrieval job result for a standalone spark cluster

get_output_file_uri(timeout_sec: Optional[int] = None, block=True)[source]

Get output file uri to the result file. This method will block until the job succeeded, or if the job didn’t execute successfully within timeout.

Parameters
  • timeout_sec (int) – Max no of seconds to wait until job is done. If “timeout_sec” is exceeded or if the job fails, an exception will be raised.

  • block (bool) – If false, don’t block until the job is done. If block=True, timeout parameter is ignored.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

file uri to the result file.

Return type

str

class feast_spark.pyspark.launchers.standalone.local.StandaloneClusterStreamingIngestionJob(job_id: str, job_name: str, process: Popen, ui_port: int, job_hash: str, feature_table: str)[source]

Bases: StandaloneClusterJobMixin, StreamIngestionJob

Streaming Ingestion job result for a standalone spark cluster

get_feature_table() str[source]

Get the feature table name associated with this job. Return None if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

get_hash() str[source]

Gets the consistent hash of this stream ingestion job.

The hash needs to be persisted at the data processing layer, so that we can get the same hash when retrieving the job from Spark.

Returns

The hash for this streaming ingestion job

Return type

str

feast_spark.pyspark.launchers.standalone.local.reset_job_cache()[source]

Module contents

class feast_spark.pyspark.launchers.standalone.StandaloneClusterLauncher(master_url: str, spark_home: Optional[str] = None)[source]

Bases: JobLauncher

Submits jobs to a standalone Spark cluster in client mode.

get_job_by_id(job_id: str) SparkJob[source]
historical_feature_retrieval(job_params: RetrievalJobParameters) RetrievalJob[source]

Submits a historical feature retrieval job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that returns file uri to the result file.

Return type

RetrievalJob

list_jobs(include_terminated: bool, project: Optional[str], table_name: Optional[str]) List[SparkJob][source]
offline_to_online_ingestion(ingestion_job_params: BatchIngestionJobParameters) BatchIngestionJob[source]

Submits a batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

BatchIngestionJob

schedule_offline_to_online_ingestion(ingestion_job_params: ScheduledBatchIngestionJobParameters)[source]

Submits a scheduled batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

ScheduledBatchIngestionJob

spark_submit(job_params: SparkJobParameters, ui_port: Optional[int] = None) Popen[source]
property spark_submit_script_path
start_stream_to_online_ingestion(ingestion_job_params: StreamIngestionJobParameters) StreamIngestionJob[source]

Starts a stream ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job.

Return type

StreamIngestionJob

unschedule_offline_to_online_ingestion(project: str, feature_table: str)[source]

Unschedule a scheduled batch ingestion job.

class feast_spark.pyspark.launchers.standalone.StandaloneClusterRetrievalJob(job_id: str, job_name: str, process: Popen, output_file_uri: str)[source]

Bases: StandaloneClusterJobMixin, RetrievalJob

Historical feature retrieval job result for a standalone spark cluster

get_output_file_uri(timeout_sec: Optional[int] = None, block=True)[source]

Get output file uri to the result file. This method will block until the job succeeded, or if the job didn’t execute successfully within timeout.

Parameters
  • timeout_sec (int) – Max no of seconds to wait until job is done. If “timeout_sec” is exceeded or if the job fails, an exception will be raised.

  • block (bool) – If false, don’t block until the job is done. If block=True, timeout parameter is ignored.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

file uri to the result file.

Return type

str

feast_spark.pyspark.launchers.standalone.reset_job_cache()[source]