feast_spark.pyspark package
Subpackages
Submodules
feast_spark.pyspark.abc module
- class feast_spark.pyspark.abc.BatchIngestionJob[source]
Bases:
SparkJob
Container for the ingestion job result
- class feast_spark.pyspark.abc.BatchIngestionJobParameters(feature_table: Dict, source: Dict, start: datetime, end: datetime, jar: str, redis_host: Optional[str], redis_port: Optional[int], redis_password: Optional[str], redis_ssl: Optional[bool], bigtable_project: Optional[str], bigtable_instance: Optional[str], cassandra_host: Optional[str] = None, cassandra_port: Optional[int] = None, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, stencil_url: Optional[str] = None, stencil_token: Optional[str] = None)[source]
Bases:
IngestionJobParameters
- get_arguments() List[str] [source]
Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]
- get_job_type() SparkJobType [source]
Getter for job type. :returns: Job type enum. :rtype: SparkJobType
- class feast_spark.pyspark.abc.IngestionJobParameters(feature_table: Dict, source: Dict, jar: str, redis_host: Optional[str] = None, redis_port: Optional[int] = None, redis_password: Optional[str] = None, redis_ssl: Optional[bool] = None, bigtable_project: Optional[str] = None, bigtable_instance: Optional[str] = None, cassandra_host: Optional[str] = None, cassandra_port: Optional[int] = None, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, stencil_url: Optional[str] = None, stencil_token: Optional[str] = None, drop_invalid_rows: bool = False)[source]
Bases:
SparkJobParameters
- get_arguments() List[str] [source]
Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]
- get_class_name() Optional[str] [source]
Getter for main class name if it’s applicable :returns: java class path, e.g. feast.ingestion.IngestionJob. :rtype: Optional[str]
- class feast_spark.pyspark.abc.JobLauncher[source]
Bases:
ABC
Submits spark jobs to a spark cluster. Currently supports only historical feature retrieval jobs.
- abstract historical_feature_retrieval(retrieval_job_params: RetrievalJobParameters) RetrievalJob [source]
Submits a historical feature retrieval job to a Spark cluster.
- Raises
SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.
- Returns
wrapper around remote job that returns file uri to the result file.
- Return type
- abstract list_jobs(include_terminated: bool, project: Optional[str], table_name: Optional[str]) List[SparkJob] [source]
- abstract offline_to_online_ingestion(ingestion_job_params: BatchIngestionJobParameters) BatchIngestionJob [source]
Submits a batch ingestion job to a Spark cluster.
- Raises
SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.
- Returns
wrapper around remote job that can be used to check when job completed.
- Return type
- abstract schedule_offline_to_online_ingestion(ingestion_job_params: ScheduledBatchIngestionJobParameters)[source]
Submits a scheduled batch ingestion job to a Spark cluster.
- Raises
SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.
- Returns
wrapper around remote job that can be used to check when job completed.
- Return type
ScheduledBatchIngestionJob
- abstract start_stream_to_online_ingestion(ingestion_job_params: StreamIngestionJobParameters) StreamIngestionJob [source]
Starts a stream ingestion job to a Spark cluster.
- Raises
SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.
- Returns
wrapper around remote job.
- Return type
- class feast_spark.pyspark.abc.RetrievalJob[source]
Bases:
SparkJob
Container for the historical feature retrieval job result
- abstract get_output_file_uri(timeout_sec=None, block=True)[source]
Get output file uri to the result file. This method will block until the job succeeded, or if the job didn’t execute successfully within timeout.
- Parameters
timeout_sec (int) – Max no of seconds to wait until job is done. If “timeout_sec” is exceeded or if the job fails, an exception will be raised.
block (bool) – If false, don’t block until the job is done. If block=True, timeout parameter is ignored.
- Raises
SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.
- Returns
file uri to the result file.
- Return type
str
- class feast_spark.pyspark.abc.RetrievalJobParameters(project: str, feature_tables: List[Dict], feature_tables_sources: List[Dict], entity_source: Dict, destination: Dict, extra_packages: Optional[List[str]] = None, checkpoint_path: Optional[str] = None)[source]
Bases:
SparkJobParameters
- get_arguments() List[str] [source]
Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]
- get_extra_packages() List[str] [source]
Getter for extra maven packages to be included on driver and executor classpath if applicable. :returns: List of maven packages :rtype: List[str]
- get_job_type() SparkJobType [source]
Getter for job type. :returns: Job type enum. :rtype: SparkJobType
- class feast_spark.pyspark.abc.ScheduledBatchIngestionJobParameters(feature_table: Dict, source: Dict, ingestion_timespan: int, cron_schedule: str, jar: str, redis_host: Optional[str], redis_port: Optional[int], redis_password: Optional[str], redis_ssl: Optional[bool], bigtable_project: Optional[str], bigtable_instance: Optional[str], cassandra_host: Optional[str] = None, cassandra_port: Optional[int] = None, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, stencil_url: Optional[str] = None, stencil_token: Optional[str] = None)[source]
Bases:
IngestionJobParameters
- get_arguments() List[str] [source]
Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]
- get_job_type() SparkJobType [source]
Getter for job type. :returns: Job type enum. :rtype: SparkJobType
- class feast_spark.pyspark.abc.SparkJob[source]
Bases:
ABC
Base class for all spark jobs
- abstract get_id() str [source]
Getter for the job id. The job id must be unique for each spark job submission.
- Returns
Job id.
- Return type
str
- abstract get_status() SparkJobStatus [source]
Job Status retrieval
- Returns
Job status
- Return type
- exception feast_spark.pyspark.abc.SparkJobFailure[source]
Bases:
Exception
Job submission failed, encountered error during execution, or timeout
- class feast_spark.pyspark.abc.SparkJobParameters[source]
Bases:
ABC
- abstract get_arguments() List[str] [source]
Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]
- get_class_name() Optional[str] [source]
Getter for main class name if it’s applicable :returns: java class path, e.g. feast.ingestion.IngestionJob. :rtype: Optional[str]
- get_extra_packages() List[str] [source]
Getter for extra maven packages to be included on driver and executor classpath if applicable. :returns: List of maven packages :rtype: List[str]
- abstract get_job_type() SparkJobType [source]
Getter for job type. :returns: Job type enum. :rtype: SparkJobType
- class feast_spark.pyspark.abc.SparkJobStatus(value)[source]
Bases:
Enum
An enumeration.
- COMPLETED = 3
- FAILED = 2
- IN_PROGRESS = 1
- STARTING = 0
- class feast_spark.pyspark.abc.SparkJobType(value)[source]
Bases:
Enum
An enumeration.
- BATCH_INGESTION = 1
- HISTORICAL_RETRIEVAL = 0
- SCHEDULED_BATCH_INGESTION = 3
- STREAM_INGESTION = 2
- class feast_spark.pyspark.abc.StreamIngestionJob[source]
Bases:
SparkJob
Container for the streaming ingestion job result
- class feast_spark.pyspark.abc.StreamIngestionJobParameters(feature_table: Dict, source: Dict, jar: str, extra_jars: Optional[List[str]] = None, redis_host: Optional[str] = None, redis_port: Optional[int] = None, redis_password: Optional[str] = None, redis_ssl: Optional[bool] = None, bigtable_project: Optional[str] = None, bigtable_instance: Optional[str] = None, cassandra_host: Optional[str] = None, cassandra_port: Optional[int] = None, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, checkpoint_path: Optional[str] = None, stencil_url: Optional[str] = None, stencil_token: Optional[str] = None, drop_invalid_rows: bool = False, triggering_interval: Optional[int] = None)[source]
Bases:
IngestionJobParameters
- get_arguments() List[str] [source]
Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]
- get_job_type() SparkJobType [source]
Getter for job type. :returns: Job type enum. :rtype: SparkJobType
feast_spark.pyspark.historical_feature_retrieval_job module
feast_spark.pyspark.launcher module
- feast_spark.pyspark.launcher.get_health_metrics(client: Client, project: str, table_names: List[str]) Dict[str, List[str]] [source]
- feast_spark.pyspark.launcher.get_stream_to_online_ingestion_params(client: Client, project: str, feature_table: FeatureTable, extra_jars: List[str]) StreamIngestionJobParameters [source]
- feast_spark.pyspark.launcher.list_jobs(include_terminated: bool, client: Client, project: Optional[str] = None, table_name: Optional[str] = None) List[SparkJob] [source]
- feast_spark.pyspark.launcher.resolve_launcher(config: Config) JobLauncher [source]
- feast_spark.pyspark.launcher.schedule_offline_to_online_ingestion(client: Client, project: str, feature_table: FeatureTable, ingestion_timespan: int, cron_schedule: str)[source]
- feast_spark.pyspark.launcher.stage_dataframe(df, event_timestamp_column: str, config: Config) FileSource [source]
Helper function to upload a pandas dataframe in parquet format to a temporary location (under SPARK_STAGING_LOCATION) and return it wrapped in a FileSource.
- Parameters
event_timestamp_column (str) – the name of the timestamp column in the dataframe.
config (Config) – feast config.
- feast_spark.pyspark.launcher.start_historical_feature_retrieval_job(client: Client, project: str, entity_source: Union[FileSource, BigQuerySource], feature_tables: List[FeatureTable], output_format: str, output_path: str) RetrievalJob [source]
- feast_spark.pyspark.launcher.start_historical_feature_retrieval_spark_session(client: Client, project: str, entity_source: Union[FileSource, BigQuerySource], feature_tables: List[FeatureTable])[source]
- feast_spark.pyspark.launcher.start_offline_to_online_ingestion(client: Client, project: str, feature_table: FeatureTable, start: datetime, end: datetime) BatchIngestionJob [source]
- feast_spark.pyspark.launcher.start_stream_to_online_ingestion(client: Client, project: str, feature_table: FeatureTable, extra_jars: List[str]) StreamIngestionJob [source]