The section describes the various transformations supported by JFrog ML.
SQL
Note
JFrog ML runs Spark SQL in the background. Please comply with Spark Standards.
The following is an implementation of creating a transformation using a SQL:
from frogml.core.feature_store.feature_sets.read_policies import ReadPolicy
from frogml.feature_store.feature_sets import batch
from frogml.core.feature_store.feature_sets.transformations import SparkSqlTransformation
@batch.feature_set(
name="user-transaction-aggregations",
key="user_id",
data_sources={"snowflake_datasource": ReadPolicy.TimeFrame(days=30)},
)
def user_features():
return SparkSqlTransformation(sql="""
SELECT
user_id,
AVG(credit_amount) as avg_credit_amount,
STD(credit_amount) as std_credit_amount,
MAX(credit_amount) as max_credit_amount,
MIN(date_created) as first_transaction,
AVG(duration) as avg_loan_duration,
AVG(job) as seniority_level
FROM snowflake_datasource
Group By user_id""")
Creating Transformations
When creating transformations, keep the following guidelines in mind:
Key Inclusion:
The resulting feature vector must incorporate the feature set key, used in the definition.
Timestamp Column Requirement:
For read policies such as
NewOnlyandFullRead, it is imperative to include the timestamp column in the returned feature vector.
Use the data source as the table name in the FROM clause.
Make sure the column names resulting from the SQL has no special characters. The allowed characters are: a-z, A-Z, 0-9, _.
Note
Logging
JFrog supports the default Python logger, which you can import from the standard python logging library.
PySpark
To use this feature, ensure that you have installed the frogml-cli with the feature-store extra.
pip install -U "frogml-sdk[feature-store]"
PySpark transformation is defined by creating a UDF which is responsible for the transformation logic.
UDF Definition:
Arguments:
df_dict: spark.DataFrame- MandatoryA dictionary in the form of
{'<batch_sourcename>': df ...}.qwargs: Dict[str, Any]- OptionalIf added, runtime parameters will be injected via
qwargs(e.g.qwak_ingestion_start_timestamp,qwak_ingestion_end_timestamp)
Return value: spark.DataFrame
The returned df (PySpark DataFrame) must contain a column representing the configured key. The df column names must not include whitespaces or special characters.
Important
Python and Dependency RestrictionsT
To ensure compatibility and stability, it is mandatory to use Python 3.8 when registering a feature set with a Koalas transformation. Additionally, ensure that cloudpickle version is locked to 2.2.1.
from typing import Dict, Any
import pyspark.sql as spark
import pyspark.sql.functions as F
from frogml.feature_store.feature_sets import batch
from frogml.core.feature_store.feature_sets.read_policies import ReadPolicy
from frogml.core.feature_store.feature_sets.transformations import PySparkTransformation
@batch.feature_set(
name="user-features",
key="user",
data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},
timestamp_column_name="date_created"
)
@batch.scheduling(cron_expression="0 8 * * *")
def transform():
def amount_stats(df_dict: Dict[str, spark.DataFrame], qwargs: Dict[str, Any]) -> spark.DataFrame:
df = df_dict['snowflake_transactions_table']
agg_df = df.groupby('user').agg(F.max('amount').alias("max_duration"))
return agg_df
return PySparkTransformation(function=amount_stats)
Warning
Function Scope and Dependencies
PySpark function scope and variables must be defined under the transform function, as shown in the code snippet above.
At runtime, only PySpark and python native library, are available.
Note
Logging
JFrog supports the default Python logger, which you can import from the standard python logging library.
Warnings about PySpark Usage Patterns
Avoid using DataFrame.localCheckpoint, even though local checkpointing might improve performance of some workloads, local checkpoints are ephemeral, have limited disk space and can lead to execution failures. Regular checkpoints are recommended to use instead for most other cases.
Pandas On Spark
Pandas On Spark is a pandas implementation using Spark. Please ensure your code is Pandas On Spark Library compliant.
The User Defined Function (UDF) receives a dictionary in the form of {'<batch_source_name>': pyspark.pandas.DataFrame ...} as input.
The returned pyspark.pandas.DataFrame (Pandas On Spark DataFrame) must contain a column representing the configured key and timestamp column. The psdf must not include complex columns, such as multi-index, and the name must not include whitespaces or special characters.
Make sure that column names returned from the UDF do not contain special characters.
The allowed characters are: a-z, A-Z, 0-9, _..
🚧 Restrictions
Deployment - supported for Hybrid deployments ONLY.
Dependencies - to ensure compatibility and stability, it is mandatory to use Python 3.8 when registering a Feature Set with a Pandas On Spark transformation.
from typing import Dict, Any
from frogml.feature_store.feature_sets import batch
from frogml.core.feature_store.feature_sets.read_policies import ReadPolicy
from frogml.core.feature_store.feature_sets.transformations import PandasOnSparkTransformation
from pyspark.pandas import DataFrame
@batch.feature_set(
name="user-features",
key="user",
data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},
timestamp_column_name="date_created"
)
@batch.scheduling(cron_expression="0 8 * * *")
def transform():
def amount_stats(df_dict: Dict[str, DataFrame], qwargs: Dict[str, Any]) -> DataFrame:
ps_df = df_dict['snowflake_transactions_table']
agg_psdf = ps_df.groupby('user').agg({'amount': ['avg', 'sum']})
return agg_psdf
return PandasOnSparkTransformation(function=amount_stats)
Important
Function Scope and Dependencies
Pandas On Spark function scope and variables must be defined under the transform function, as shown in the code snippet above.
Note
Logging
We support the default Python logger, which you can import from the standard python logging library.