Source code for feast_spark.job_service

import logging
import os
import signal
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Optional, Tuple, cast

import grpc
from google.api_core.exceptions import FailedPrecondition
from google.protobuf.timestamp_pb2 import Timestamp
from prometheus_client import start_http_server

from feast import Client as FeastClient
from feast import FeatureTable
from feast.core import JobService_pb2_grpc as LegacyJobService_pb2_grpc
from feast.data_source import DataSource
from feast_spark import Client as Client
from feast_spark.api import JobService_pb2_grpc
from feast_spark.api.JobService_pb2 import (
    CancelJobResponse,
    GetHealthMetricsResponse,
    GetHistoricalFeaturesRequest,
    GetHistoricalFeaturesResponse,
    GetJobResponse,
)
from feast_spark.api.JobService_pb2 import Job as JobProto
from feast_spark.api.JobService_pb2 import (
    JobStatus,
    JobType,
    ListJobsResponse,
    ScheduleOfflineToOnlineIngestionJobRequest,
    ScheduleOfflineToOnlineIngestionJobResponse,
    StartOfflineToOnlineIngestionJobRequest,
    StartOfflineToOnlineIngestionJobResponse,
    StartStreamToOnlineIngestionJobRequest,
    StartStreamToOnlineIngestionJobResponse,
    UnscheduleOfflineToOnlineIngestionJobRequest,
    UnscheduleOfflineToOnlineIngestionJobResponse,
)
from feast_spark.constants import ConfigOptions as opt
from feast_spark.lock_manager import JobOperation, JobOperationLock
from feast_spark.metrics import job_schedule_count, job_submission_count
from feast_spark.pyspark.abc import (
    BatchIngestionJob,
    RetrievalJob,
    SparkJob,
    SparkJobStatus,
    StreamIngestionJob,
)
from feast_spark.pyspark.launcher import (
    get_health_metrics,
    get_job_by_id,
    get_stream_to_online_ingestion_params,
    list_jobs,
    schedule_offline_to_online_ingestion,
    start_historical_feature_retrieval_job,
    start_offline_to_online_ingestion,
    start_stream_to_online_ingestion,
    unschedule_offline_to_online_ingestion,
)
from feast_spark.pyspark.launchers.k8s.k8s import JobNotFoundException
from feast_spark.third_party.grpc.health.v1.HealthService_pb2 import (
    HealthCheckResponse,
    ServingStatus,
)
from feast_spark.third_party.grpc.health.v1.HealthService_pb2_grpc import (
    HealthServicer,
    add_HealthServicer_to_server,
)

logger = logging.getLogger(__name__)


def _job_to_proto(spark_job: SparkJob) -> JobProto:
    job = JobProto()
    job.id = spark_job.get_id()
    job.log_uri = cast(str, spark_job.get_log_uri() or "")
    job.error_message = cast(str, spark_job.get_error_message() or "")
    status = spark_job.get_status()
    if status == SparkJobStatus.COMPLETED:
        job.status = JobStatus.JOB_STATUS_DONE
    elif status == SparkJobStatus.IN_PROGRESS:
        job.status = JobStatus.JOB_STATUS_RUNNING
    elif status == SparkJobStatus.FAILED:
        job.status = JobStatus.JOB_STATUS_ERROR
    elif status == SparkJobStatus.STARTING:
        job.status = JobStatus.JOB_STATUS_PENDING
    else:
        raise ValueError(f"Invalid job status {status}")

    if isinstance(spark_job, RetrievalJob):
        job.type = JobType.RETRIEVAL_JOB
        job.retrieval.output_location = spark_job.get_output_file_uri(block=False)
    elif isinstance(spark_job, BatchIngestionJob):
        job.type = JobType.BATCH_INGESTION_JOB
        job.batch_ingestion.table_name = spark_job.get_feature_table()
    elif isinstance(spark_job, StreamIngestionJob):
        job.type = JobType.STREAM_INGESTION_JOB
        job.stream_ingestion.table_name = spark_job.get_feature_table()
    else:
        raise ValueError(f"Invalid job type {job}")

    job.start_time.FromDatetime(spark_job.get_start_time())

    return job


[docs]class JobServiceServicer(JobService_pb2_grpc.JobServiceServicer): def __init__(self, client: Client): self.client = client @property def _whitelisted_projects(self) -> Optional[List[str]]: if self.client.config.exists(opt.WHITELISTED_PROJECTS): whitelisted_projects = self.client.config.get(opt.WHITELISTED_PROJECTS) return whitelisted_projects.split(",") return None
[docs] def is_whitelisted(self, project: str): # Whitelisted projects not specified, allow all projects if not self._whitelisted_projects: return True return project in self._whitelisted_projects
[docs] def StartOfflineToOnlineIngestionJob( self, request: StartOfflineToOnlineIngestionJobRequest, context ): """Start job to ingest data from offline store into online store""" job_submission_count.labels( "batch_ingestion", request.project, request.table_name ).inc() feature_table = self.client.feature_store.get_feature_table( request.table_name, request.project ) job = start_offline_to_online_ingestion( client=self.client, project=request.project, feature_table=feature_table, start=request.start_date.ToDatetime(), end=request.end_date.ToDatetime(), ) job_start_timestamp = Timestamp() job_start_timestamp.FromDatetime(job.get_start_time()) return StartOfflineToOnlineIngestionJobResponse( id=job.get_id(), job_start_time=job_start_timestamp, table_name=request.table_name, log_uri=job.get_log_uri(), # type: ignore )
[docs] def ScheduleOfflineToOnlineIngestionJob( self, request: ScheduleOfflineToOnlineIngestionJobRequest, context ): """Schedule job to ingest data from offline store into online store periodically""" job_schedule_count.labels(request.project, request.table_name).inc() feature_table = self.client.feature_store.get_feature_table( request.table_name, request.project ) schedule_offline_to_online_ingestion( client=self.client, project=request.project, feature_table=feature_table, ingestion_timespan=request.ingestion_timespan, cron_schedule=request.cron_schedule, ) return ScheduleOfflineToOnlineIngestionJobResponse()
[docs] def UnscheduleOfflineToOnlineIngestionJob( self, request: UnscheduleOfflineToOnlineIngestionJobRequest, context ): feature_table = self.client.feature_store.get_feature_table( request.table_name, request.project ) unschedule_offline_to_online_ingestion( client=self.client, project=request.project, feature_table=feature_table, ) return UnscheduleOfflineToOnlineIngestionJobResponse()
[docs] def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context): """Produce a training dataset, return a job id that will provide a file reference""" job_submission_count.labels("historical_retrieval", request.project, "").inc() job = start_historical_feature_retrieval_job( client=self.client, project=request.project, entity_source=DataSource.from_proto(request.entity_source), feature_tables=self.client._get_feature_tables_from_feature_refs( list(request.feature_refs), request.project ), output_format=request.output_format, output_path=request.output_location, ) output_file_uri = job.get_output_file_uri(block=False) job_start_timestamp = Timestamp() job_start_timestamp.FromDatetime(job.get_start_time()) return GetHistoricalFeaturesResponse( id=job.get_id(), output_file_uri=output_file_uri, job_start_time=job_start_timestamp, )
[docs] def StartStreamToOnlineIngestionJob( self, request: StartStreamToOnlineIngestionJobRequest, context ): """Start job to ingest data from stream into online store""" job_submission_count.labels( "streaming", request.project, request.table_name ).inc() if not self.is_whitelisted(request.project): raise ValueError( f"Project {request.project} is not whitelisted. Please contact your Feast administrator to whitelist it." ) feature_table = self.client.feature_store.get_feature_table( request.table_name, request.project ) if self.client.config.getboolean(opt.JOB_SERVICE_ENABLE_CONTROL_LOOP): # If the control loop is enabled, return existing stream ingestion job id instead of starting a new one params = get_stream_to_online_ingestion_params( self.client, request.project, feature_table, [] ) job_hash = params.get_job_hash() for job in list_jobs(include_terminated=True, client=self.client): if isinstance(job, StreamIngestionJob) and job.get_hash() == job_hash: job_start_timestamp = Timestamp() job_start_timestamp.FromDatetime(job.get_start_time()) return StartStreamToOnlineIngestionJobResponse( id=job.get_id(), job_start_time=job_start_timestamp, table_name=job.get_feature_table(), log_uri=job.get_log_uri(), # type: ignore ) raise RuntimeError( "Feast Job Service has control loop enabled, " "but couldn't find the existing stream ingestion job for the given FeatureTable" ) # TODO: add extra_jars to request job = start_stream_to_online_ingestion( client=self.client, project=request.project, feature_table=feature_table, extra_jars=[], ) job_start_timestamp = Timestamp() job_start_timestamp.FromDatetime(job.get_start_time()) return StartStreamToOnlineIngestionJobResponse( id=job.get_id(), job_start_time=job_start_timestamp, table_name=request.table_name, log_uri=job.get_log_uri(), # type: ignore )
[docs] def ListJobs(self, request, context): """List all types of jobs""" jobs = list_jobs( include_terminated=request.include_terminated, project=request.project, table_name=request.table_name, client=self.client, ) return ListJobsResponse(jobs=[_job_to_proto(job) for job in jobs])
[docs] def CancelJob(self, request, context): """Stop a single job""" job = get_job_by_id(request.job_id, client=self.client) job.cancel() return CancelJobResponse()
[docs] def GetJob(self, request, context): """Get details of a single job""" job = get_job_by_id(request.job_id, client=self.client) return GetJobResponse(job=_job_to_proto(job))
[docs] def GetHealthMetrics(self, request, context): """Return ingestion jobs health metrics""" metrics = get_health_metrics( project=request.project, table_names=request.table_names, client=self.client ) return GetHealthMetricsResponse( passed=metrics["passed"], failed=metrics["failed"] )
[docs]def start_prometheus_serving(port: int = 8080) -> None: """Initialize Prometheus metric server""" start_http_server(port)
[docs]def start_control_loop() -> None: """Starts control loop that continuously ensures that correct jobs are being run. Currently this affects only the stream ingestion jobs. Please refer to ensure_stream_ingestion_jobs for full documentation on how the check works. """ logger.info( "Feast Job Service is starting a control loop in a background thread, " "which will ensure that stream ingestion jobs are successfully running." ) try: feature_store = FeastClient() client = Client(feature_store) while True: ensure_stream_ingestion_jobs(client, all_projects=True) time.sleep(1) except Exception: traceback.print_exc() finally: # Send interrupt signal to the main thread to kill the server if control loop fails os.kill(os.getpid(), signal.SIGINT)
[docs]class HealthServicerImpl(HealthServicer):
[docs] def Check(self, request, context): return HealthCheckResponse(status=ServingStatus.SERVING)
[docs]class LoggingInterceptor(grpc.ServerInterceptor):
[docs] def intercept_service(self, continuation, handler_call_details): if handler_call_details.method != "/grpc.health.v1.Health/Check": logger.debug(handler_call_details) return continuation(handler_call_details)
[docs]def start_job_service() -> None: """ Start Feast Job Service """ feast_client = FeastClient() client = Client(feast_client) if client.config.getboolean(opt.JOB_SERVICE_ENABLE_CONTROL_LOOP): # Start the control loop thread only if it's enabled from configs thread = threading.Thread(target=start_control_loop, daemon=True) thread.start() metricServerThread = threading.Thread( target=start_prometheus_serving, daemon=True, args=[client.config.getint(opt.JOB_SERVICE_PROMETHEUS_METRIC_PORT)], ) metricServerThread.start() server = grpc.server(ThreadPoolExecutor(), interceptors=(LoggingInterceptor(),)) JobService_pb2_grpc.add_JobServiceServicer_to_server( JobServiceServicer(client), server ) LegacyJobService_pb2_grpc.add_JobServiceServicer_to_server( JobServiceServicer(client), server ) add_HealthServicer_to_server(HealthServicerImpl(), server) server.add_insecure_port("[::]:6568") server.start() logger.info("Feast Job Service is listening on port :6568") server.wait_for_termination()
def _get_expected_job_hash_to_tables( client: Client, projects: List[str] ) -> Dict[str, Tuple[str, FeatureTable]]: """ Checks all feature tables for the requires project(s) and determines all required stream ingestion jobs from them. Outputs a map of the expected job_hash to a tuple of (project, table_name). Args: all_projects (bool): If true, runs the check for all project. Otherwise only checks the current project. Returns: Dict[str, Tuple[str, str]]: Map of job_hash -> (project, table_name) for expected stream ingestion jobs """ job_hash_to_table_refs = {} for project in projects: feature_tables = client.feature_store.list_feature_tables(project) for feature_table in feature_tables: if feature_table.stream_source is not None: params = get_stream_to_online_ingestion_params( client, project, feature_table, [] ) job_hash = params.get_job_hash() job_hash_to_table_refs[job_hash] = (project, feature_table) return job_hash_to_table_refs
[docs]def ensure_stream_ingestion_jobs(client: Client, all_projects: bool): """Ensures all required stream ingestion jobs are running and cleans up the unnecessary jobs. More concretely, it will determine - which stream ingestion jobs are running - which stream ingestion jobs should be running And it'll do 2 kinds of operations - Cancel all running jobs that should not be running - Start all non-existent jobs that should be running Args: all_projects (bool): If true, runs the check for all project. Otherwise only checks the client's current project. """ projects = ( client.feature_store.list_projects() if all_projects else [client.feature_store.project] ) if client.config.exists(opt.WHITELISTED_PROJECTS): whitelisted_projects = client.config.get(opt.WHITELISTED_PROJECTS) if whitelisted_projects: whitelisted_projects = whitelisted_projects.split(",") projects = [ project for project in projects if project in whitelisted_projects ] expected_job_hash_to_tables = _get_expected_job_hash_to_tables(client, projects) expected_job_hashes = set(expected_job_hash_to_tables.keys()) jobs_by_hash: Dict[str, StreamIngestionJob] = {} # when we want to retry failed jobs, we shouldn't include terminated jobs here # thus, Control Loop will behave like no job exists and will spawn new one for job in client.list_jobs( include_terminated=not client.config.getboolean( opt.JOB_SERVICE_RETRY_FAILED_JOBS ) ): status = None try: status = job.get_status() except JobNotFoundException: logger.warning(f"{job.get_id()} was already removed") if ( isinstance(job, StreamIngestionJob) and status is not None and status != SparkJobStatus.COMPLETED ): jobs_by_hash[job.get_hash()] = job existing_job_hashes = set(jobs_by_hash.keys()) job_hashes_to_cancel = existing_job_hashes - expected_job_hashes job_hashes_to_start = expected_job_hashes - existing_job_hashes logger.debug( f"existing_job_hashes = {sorted(list(existing_job_hashes))} " f"expected_job_hashes = {sorted(list(expected_job_hashes))}" ) lock_config = { "redis_host": client.config.get(opt.LOCK_MGR_REDIS_HOST), "redis_port": client.config.getint(opt.LOCK_MGR_REDIS_PORT), "lock_expiry": client.config.getint(opt.LOCK_EXPIRY), } for job_hash in job_hashes_to_start: # Any job that we wish to start should be among expected table refs map project, feature_table = expected_job_hash_to_tables[job_hash] # start the job if lock is available with JobOperationLock( job_hash=job_hash, operation=JobOperation.START, **lock_config ) as lock: if lock: logger.warning( f"Starting a stream ingestion job for project={project}, " f"table_name={feature_table.name} with job_hash={job_hash}" ) client.start_stream_to_online_ingestion( feature_table, [], project=project ) # prevent scheduler from peak load time.sleep(client.config.getint(opt.JOB_SERVICE_PAUSE_BETWEEN_JOBS)) for job_hash in job_hashes_to_cancel: job = jobs_by_hash[job_hash] job_status = None try: job_status = job.get_status() except JobNotFoundException: pass if job_status != SparkJobStatus.IN_PROGRESS: logger.warning( f"Can't cancel job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}" ) continue logger.warning( f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}" ) try: with JobOperationLock( job_hash=job_hash, operation=JobOperation.CANCEL, **lock_config ) as lock: if lock: job.cancel() except FailedPrecondition as exc: logger.error(f"Job canceling failed with exception {exc}")