Transformations

JFrog ML Documentation

Products
JFrog ML
Content Type
User Guide

The section describes the various transformations supported by JFrog ML.

SQL

Note

JFrog ML runs Spark SQL in the background. Please comply with Spark Standards.

The following is an implementation of creating a transformation using a SQL:

from frogml.core.feature_store.feature_sets.read_policies import ReadPolicy
from frogml.feature_store.feature_sets import batch
from frogml.core.feature_store.feature_sets.transformations import SparkSqlTransformation

@batch.feature_set(
    name="user-transaction-aggregations",
    key="user_id",
    data_sources={"snowflake_datasource": ReadPolicy.TimeFrame(days=30)},
)
def user_features():
    return SparkSqlTransformation(sql="""
            SELECT
            user_id,
            AVG(credit_amount) as avg_credit_amount,
            STD(credit_amount) as std_credit_amount,
            MAX(credit_amount) as max_credit_amount,
            MIN(date_created) as first_transaction,
            AVG(duration) as avg_loan_duration,
            AVG(job) as seniority_level
        FROM snowflake_datasource
        Group By user_id""")

Creating Transformations

When creating transformations, keep the following guidelines in mind:

  1. Key Inclusion:

    • The resulting feature vector must incorporate the feature set key, used in the definition.

  2. Timestamp Column Requirement:

    • For read policies such as NewOnly and FullRead, it is imperative to include the timestamp column in the returned feature vector.

  3. Use the data source as the table name in the FROM clause.

  4. Make sure the column names resulting from the SQL has no special characters. The allowed characters are: a-z, A-Z, 0-9, _.

Note

Logging

JFrog supports the default Python logger, which you can import from the standard python logging library.

PySpark

To use this feature, ensure that you have installed the frogml-cli with the feature-store extra.

pip install -U "frogml-sdk[feature-store]"

PySpark transformation is defined by creating a UDF which is responsible for the transformation logic.

UDF Definition:

  • Arguments:

    • df_dict: spark.DataFrame- Mandatory

      A dictionary in the form of {'<batch_sourcename>': df ...}.

    • qwargs: Dict[str, Any]- Optional

      If added, runtime parameters will be injected via qwargs (e.g. qwak_ingestion_start_timestamp, qwak_ingestion_end_timestamp)

  • Return value: spark.DataFrame

The returned df (PySpark DataFrame) must contain a column representing the configured key. The df column names must not include whitespaces or special characters.

Important

Python and Dependency RestrictionsT

To ensure compatibility and stability, it is mandatory to use Python 3.8 when registering a feature set with a Koalas transformation. Additionally, ensure that cloudpickle version is locked to 2.2.1.

from typing import Dict, Any

import pyspark.sql as spark
import pyspark.sql.functions as F

from frogml.feature_store.feature_sets import batch
from frogml.core.feature_store.feature_sets.read_policies import ReadPolicy
from frogml.core.feature_store.feature_sets.transformations import PySparkTransformation

@batch.feature_set(
    name="user-features",
    key="user",
    data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},
    timestamp_column_name="date_created"
)
@batch.scheduling(cron_expression="0 8 * * *")
def transform():
    def amount_stats(df_dict: Dict[str, spark.DataFrame], qwargs: Dict[str, Any]) -> spark.DataFrame:
        df = df_dict['snowflake_transactions_table']
        agg_df = df.groupby('user').agg(F.max('amount').alias("max_duration"))

        return agg_df

    return PySparkTransformation(function=amount_stats)

Warning

Function Scope and Dependencies

PySpark function scope and variables must be defined under the transform function, as shown in the code snippet above.

At runtime, only PySpark and python native library, are available.

Note

Logging

JFrog supports the default Python logger, which you can import from the standard python logging library.

Warnings about PySpark Usage Patterns

Avoid using DataFrame.localCheckpoint, even though local checkpointing might improve performance of some workloads, local checkpoints are ephemeral, have limited disk space and can lead to execution failures. Regular checkpoints are recommended to use instead for most other cases.

Pandas On Spark

Pandas On Spark is a pandas implementation using Spark. Please ensure your code is Pandas On Spark Library compliant.

The User Defined Function (UDF) receives a dictionary in the form of {'<batch_source_name>': pyspark.pandas.DataFrame ...} as input.

The returned pyspark.pandas.DataFrame (Pandas On Spark DataFrame) must contain a column representing the configured key and timestamp column. The psdf must not include complex columns, such as multi-index, and the name must not include whitespaces or special characters.

Make sure that column names returned from the UDF do not contain special characters.

The allowed characters are: a-z, A-Z, 0-9, _..

🚧 Restrictions

Deployment - supported for Hybrid deployments ONLY.

Dependencies - to ensure compatibility and stability, it is mandatory to use Python 3.8 when registering a Feature Set with a Pandas On Spark transformation.

from typing import Dict, Any
from frogml.feature_store.feature_sets import batch
from frogml.core.feature_store.feature_sets.read_policies import ReadPolicy
from frogml.core.feature_store.feature_sets.transformations import PandasOnSparkTransformation
from pyspark.pandas import DataFrame


@batch.feature_set(
    name="user-features",
    key="user",
    data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},
    timestamp_column_name="date_created"
)
@batch.scheduling(cron_expression="0 8 * * *")
def transform():
    def amount_stats(df_dict: Dict[str, DataFrame], qwargs: Dict[str, Any]) -> DataFrame:
        ps_df = df_dict['snowflake_transactions_table']
        agg_psdf = ps_df.groupby('user').agg({'amount': ['avg', 'sum']})
        return agg_psdf

    return PandasOnSparkTransformation(function=amount_stats)

Important

Function Scope and Dependencies

Pandas On Spark function scope and variables must be defined under the transform function, as shown in the code snippet above.

Note

Logging

We support the default Python logger, which you can import from the standard python logging library.