Batch data sources allow you to configure connections to data-at-rest sources of data.
To define a batch data source, create a configuration object that connects to the raw data source.
Batch data sources share three common parameters:
name: A unique data source identifier used to address it from a feature set object, may contain only characters, numbers and \_.
description: A general description.
date_created_column: Used to filter the data by the batch's start time/end time. date_created_column must be present in the database. This column must hold the timestamp which represents each records time.
Warning
date_created_column:
Values in this column must be increasing chronologically. If the date_created is prior to the previous date, the event will not be ingested. Missed data can be added using the Backfill.
Note
Default timestamp format for date_created_column should be yyyy-MM-dd'T'HH:mm:ss, optionally with [.SSS][XXX]. For example: 2020-01-01T00:00:00.
Registering New Data Sources
When registering a batch data source, the JFrog ML System will try to validate it, meaning it will try to fetch a sample to verify that the system can query the data source.
Additionally, batch data sources support the following validation function:
def get_sample(self, number_of_rows: int = 10) -> DataFrame:
Usage example:
from frogml.feature_store.data_sources import ParquetSource, AnonymousS3Configuration parquet_source = ParquetSource( name='parquet_source', description='a parquet source description', date_created_column='date_created', path="s3://bucket-name/data.parquet", filesystem_configuration=AnonymousS3Configuration() ) pandas_df = parquet_source.get_sample()
When invoking this function the FrogML System will validate the data source before returning a Pandas DataFrame, meaning that if an error occurred while trying to fetch a sample, the system indicates at which stage it failed.
For example it can fail:
When connecting to the specified bucket.
When the date_created_column is not the right type or does not exist.
Available Data Source Types
Snowflake
In order to create a Snowflake connection, before creating a connector make sure you have the following:
Snowflake User configured to unencrypted key-pair authentication (Read-Only access required).
Connectivity between JFrog ML environment and Snowflake host.
There are two distinct ways to use the Snowflake connector:
Providing
table.from frogml.feature_store.data_sources import SnowflakeSource snowflake_source = SnowflakeSource( name='snowflake_source', description='a snowflake source description', date_created_column='insert_date_column', host='<SnowflakeAddress/DNS:port>', username_secret_name='jfrogml_secret_snowflake_user', # use secret service pem_private_key_secret_name='jfrogml_secret_snowflake_pem_private_key', # use secret service database='db_name', schema='schema_name', warehouse='data_warehouse_name', table='snowflake_table' )Providing
query.from frogml.feature_store.data_sources import SnowflakeSource snowflake_source = SnowflakeSource( name='snowflake_source', description='a snowflake source description', date_created_column='insert_date_column', host='<SnowflakeAddress/DNS:port>', username_secret_name='jfrogml_secret_snowflake_user', # use secret service pem_private_key_secret_name='jfrogml_secret_snowflake_pem_private_key', # use secret service database='db_name', schema='schema_name', warehouse='data_warehouse_name', query='select feature1, feature2 from snowflake_table' )
Note
JFrog ML only supports unencrypted private keys without the key delimiters (begin and end). See key-pair authentication.
BigQuery
To access a BigQuery source, please download the credentials.json file from GCP to your the local file system.
Permissions
The following permissions must be applied to the provided credentials in the *credentials.json* file.
bigquery.tables.create bigquery.tables.getData bigquery.tables.get bigquery.readsessions.* bigquery.jobs.create
Uploading Credentials
Once you've downloaded *credentials.json*, encode it with base64 and set it as a JFrog ML secret using the JFrog ML Secret Service.
import json
import base64
from frogml.core.clients.secret_service import SecretServiceClient
with open('/path/of/credentials/credentials.json', 'r') as f:
creds = json.load(f)
creds64 = base64.b64encode(json.dumps(creds).encode('utf-8')).decode('utf-8')
secrets_service = SecretServiceClient()
secrets_service.set_secret(name='qwak_secret_big_query_creds', value=creds64)
Connecting to BigQuery
There are two distinct ways to use the BigQuery connector:
1. Providing dataset and table
from frogml.feature_store.data_sources import BigQuerySource some_bigquery_source = BigQuerySource( name='big_query_source', description='a bigquery source description', date_created_column='date_created', credentials_secret_name='qwak_secret_big_query_creds', dataset='dataset_name', table='table_name', project='project_id', materialization_project='materialization_project_name' parent_project='parent_project', views_enabled=False )
2. Providing sql
from frogml.feature_store.data_sources import BigQuerySource big_query_source = BigquerySource( name='big_query', description='a big query source description', date_created_column='date_created', credentials_secret_name='bigquerycred', project='project_id', sql="""SELECT l.id as id, SUM(l.feature1) as feature1, SUM(r.feature2) as feature2, MAX(l.date_created) as date_created, FROM `project_id.dataset.left` AS l JOIN `project_id.dataset.right` as r ON r.id = l.id GROUP BY id""", parent_project='', views_enabled=False )
MongoDB
from frogml.feature_store.data_sources.batch.mongodb import MongoDbSource
mongo_source = MongoDbSource(
name='mongo_source',
description='a mongo source description',
date_created_column='insert_date_column',
hosts='<MongoAddress/DNS:Port>',
username_secret_name='qwak_secret_mongodb_user', #uses the Qwak Secret Service
password_secret_name='qwak_secret_mongodb_pass', #uses the Qwak Secret Service
database='db_name',
collection='collection_name',
connection_params='authSource=admin'
)
Amazon S3 Stored Files
Ingesting Data from Parquet Files
AWS S3 filesystem data sources support explicit credentials for a custom bucket (default: frogml bucket).
To access more of your data from a different S3 bucket, use this optional configuration.
Once creating the relevant secrets using the JFrog ML-CLI you can use:
from frogml.feature_store.data_sources import ParquetSource, AwsS3FileSystemConfiguration parquet_source = ParquetSource( name='my_source', description='some s3 data source', date_created_column='DATE_CREATED', path='s3://mybucket/parquet_test_data.parquet', filesystem_configuration=AwsS3FileSystemConfiguration( access_key_secret_name='mybucket_access_key', secret_key_secret_name='mybucket_secret_key', bucket='mybucket' ) )
Important
Timestamp Column
Ensure that the timestamp column in your Parquet file(s) is represented using the appropriate PyArrow timestamp data type with microsecond precision.
You can achieve this by casting the timestamp column to the desired precision. Here's an example:
timestamp_column_microseconds = timestamp_column.cast('timestamp[us]')In the above code snippet, timestamp_column_microseconds refers to the modified timestamp column with microsecond precision. This column represents information like the date and time that a record was created, denoted as date_created.
Using Pandas timestamp data types, like datetime[ns] or int64 will result in an error when fetching data from the Parquet source.
Ingesting Data from CSV Files
CSV access works like reading a Parquet file from S3. We either specify the AWS access keys as environment variables or access a public object.
from frogml.feature_store.data_sources import CsvSource, AnonymousS3Configuration csv_source = CsvSource( name='csv_source', description='a csv source description', date_created_column='date_created', path="s3://bucket-name/data.csv", filesystem_configuration=AnonymousS3Configuration(), quote_character='"', escape_character='"' )
Note
Public S3 bucket access
When using public any bucket such as jfrogml-public, nyc-tlc , etc.. , use the AnonymousS3Configuration to access without credentials as shown in the example.
Important
Default timestamp format for date_created_column in CSV files should be yyyy-MM-dd'T'HH:mm:ss, optionally with [.SSS][XXX].
For example 2020-01-01T00:00:00
Accessing Private Amazon S3 Buckets in Data Sources
To securely leverage data stored in Amazon S3 buckets within the JFrog ML feature store, we support two robust authentication methods. This guide provides a comprehensive overview of setting up access to private S3 buckets, ensuring that your data remains secure while being fully accessible for your data operations.
IAM Role ARN Based Authentication
This method allows JFrog ML to assume an IAM role with permissions to access your S3 bucket. Create an IAM role in AWS with the necessary permissions to access the S3 bucket. For a step-by-step guide, refer to Configuring IAM Roles for S3 Access.
from frogml.core.feature_store.data_sources.source_authentication import AwsAssumeRoleAuthentication aws_authentication = AwsAssumeRoleAuthentication(role_arn='<YOUR_IAM_ROLE_ARN')f
Credentials Based Authentication
For scenarios where IAM role-based access isn't preferred, use your AWS access and secret keys, stored securely in the JFrog ML Secrets Management Service. Save your AWS
access_keyandsecret_keyin JFrog ML Secret Management.from frogml.core.feature_store.data_sources.source_authentication import AwsCredentialsAuthentication aws_authentication = AwsCredentialsAuthentication(access_key_secret_name='your-access-key-frogml-secret', secret_key_secret_name='your-secret-key-frogml-secret')
After setting up your authentication method, use the aws_authentication object to configure your CSV or Parquet data source, by assigning it to the filesystem_configuration parameter, as in the example below:
from frogml.feature_store.data_sources.batch.csv import CsvSource from frogml.feature_store.data_sources.batch.parquet import ParquetSource csv_source = CsvSource( name='name_with_underscores', description='', date_created_column='your_date_related_column', path='s3://s3...', quote_character="'", escape_character="\\", filesystem_configuration= aws_authentication )
Redshift
In order to connect to Redshift source, you will need to grant access either using AWS Access Key & Secret Key or using IAM Role.
from frogml.feature_store.data_sources import RedshiftSource redshift_source = RedshiftSource( name='my_source', date_created_column='DATE_CREATED', description='Some Redshift Source', url="company-redshift-cluster.xyz.us-east-1.redshift.amazonaws.com:5439/DBName", db_table='my_table', query='base query when fetching data from Redshift', # Must choose either db_table or query iam_role_arn='arn:aws:iam::123456789:role/assumed_role_redshift', db_user='dbuser_name', )
MySQL
from frogml.feature_store.data_sources import MysqlSource
mysql_source = MysqlSource(
name='mysql_source',
description='a mysql source description',
date_created_column='date_created',
username_secret_name='jfrogml_secret_mysql_user', # uses the JFrogML Secret Service
password_secret_name='jfrogml_secret_mysql_pass', # uses the JFrogML Secret Service
url='<MysqlAddress/DNS:Port>',
db_table='db.table_name', # i.e database1.table1
query='base query when fetching data from mysql' # Must choose either db_table or query
)
Postgres
from frogml.feature_store.data_sources.batch.postgres import ProtoPostgresqlSource
postgres_source = ProtoPostgresqlSource(
name='postgresql_source',
description='a postgres source description',
date_created_column='date_created',
username_secret_name='jfrogml_secret_postgres_user', # uses the JFrog ML Secret Service
password_secret_name='jfrogml_secret_postgres_pass', # uses the JFrog ML Secret Service
url='<PostgresqlAddress/DNS:Port/DBName>',
db_table='schema.table_name', # default schema: public
query='base query when fetching data from postgres' # Must choose either db_table or query
)
Clickhouse
from frogml.feature_store.data_sources import ClickhouseSource
clickhouse_source = ClickhouseSource(
name='clickhouse_source',
description='a clickhouse source description',
date_created_column='date_created', # Has to be of format DateTime64
username_secret_name='jfrogml_secret_clickhouse_user', # uses the JFrog ML Secret Service
password_secret_name='jfrogml_secret_clickhouse_pass', # uses the JFrog ML Secret Service
url='<ClickhouseAddress/DNS:Port/DBName>', # datatabase name is optional
db_table='database_name.table_name', # default database: default
query='base query when fetching data from clickhouse' # Must choose either db_table or query
)
Vertica
from frogml.feature_store.data_sources import VerticaSource
vertica_source = VerticaSource(
name='vertica_source',
description='a vertica source description',
date_created_column='date_created',
username_secret_name='jfrogml_secret_vertica_user', # uses the JFrog ML Secret Service
password_secret_name='jfrogml_secret_vertica_pass', # uses the JFrog ML Secret Service
host='VerticaHost without :port suffix',
port=5444,
database='MyVerticaDatabase',
schema='MyVerticaSchema e.g: public',
table='table_name'
)
AWS Athena
The Athena source is used to connect JFrog ML to Amazon Athena, allowing users to query and ingest data seamlessly
from frogml.feature_store.data_sources.batch.athena import AthenaSource
from frogml.core.feature_store.data_sources.source_authentication import AwsAssumeRoleAuthentication
from frogml.core.feature_store.data_sources.time_partition_columns import DatePartitionColumns
athena_source = AthenaSource(
name='my_athena_source',
description='my Athena source description',
date_created_column='date_created',
aws_region='us-east-1',
s3_output_location='s3://some-athena-queries-bucket/',
workgroup='some-workgroup',
query='SELECT * FROM "db"."table"',
aws_authentication=AwsAssumeRoleAuthentication(role_arn='some_role_arn'),
time_partition_columns=DatePartitionColumns(date_column_name='date_pt', date_format='%Y%m%d'),
)
Note
Workgroups
By default, your default workgroup in Athena is called primary. However, for optimal organization and resource management, it's recommended to establish a dedicated workgroup specifically for handling FeatureSet-related queries. This separation ensures that queries related to the JFrog ML FeatureSets are isolated from other users or applications utilizing AWS Athena, allowing for better debugging, query prioritization, and enhanced governance.
The data source configuration supports 2 ways of authenticating to AWS Athena
aws_authentication: AwsAuthentication
Description: Authentication method to be used.
Mandatory: Yes
Options:
AwsAssumeRoleAuthenticationDescription: Authentication using assumed role.
Fields:
role_arn: str: Mandatory
Example:
from frogml.core.feature_store.data_sources.source_authentication import AwsAssumeRoleAuthentication aws_authentication = AwsAssumeRoleAuthentication(role_arn='some_role_arn')
AwsCredentialsAuthenticationDescription: Authentication using AWS credentials in JFrog ML secrets.
Fields:
access_key_secret_name: str: Mandatorysecret_key_secret_name: str: Mandatory
Example:
from frogml.core.feature_store.data_sources.source_authentication import AwsCredentialsAuthentication aws_authentication = AwsCredentialsAuthentication(access_key_secret_name='your-access-key-frogml-secret', secret_key_secret_name='your-secret-key-frogml-secret')
Define Date Partition Columns (Optional)
time_partition_columns: TimePartitionColumns
Description: Define date partition columns correlated with
date_created_column.Optional: Yes (Highly recommended)
Options:
DatePartitionColumnsFields:
date_column_name: str: Mandatorydate_format: str: Mandatory
Example:
from frogml.core.feature_store.data_sources.time_partition_columns import DatePartitionColumns time_partition_columns = DatePartitionColumns(date_column_name='date_pt', date_format='%Y%m%d')
TimeFragmentedPartitionColumnsFields:
year_partition_column: YearFragmentColumn: Mandatorymonth_partition_column: MonthFragmentColumn: Optional (Must be set ifday_partition_columnis set)day_partition_column: DayFragmentColumn: Optional
Examples:
For
year=2022/month=01/day=05:from frogml.core.feature_store.data_sources.time_partition_columns import ( ColumnRepresentation, TimeFragmentedPartitionColumns, YearFragmentColumn, MonthFragmentColumn, DayFragmentColumn, ) time_partition_columns = TimeFragmentedPartitionColumns( YearFragmentColumn("year", ColumnRepresentation.NumericColumnRepresentation), MonthFragmentColumn("month", ColumnRepresentation.NumericColumnRepresentation), DayFragmentColumn("day", ColumnRepresentation.NumericColumnRepresentation), )For
year=2022/month=January/day=5:from frogml.core.feature_store.data_sources.time_partition_columns import ( ColumnRepresentation, DayFragmentColumn, MonthFragmentColumn, TimeFragmentedPartitionColumns, YearFragmentColumn, ) time_partition_columns = TimeFragmentedPartitionColumns( YearFragmentColumn("year", ColumnRepresentation.NumericColumnRepresentation), MonthFragmentColumn("month", ColumnRepresentation.TextualColumnRepresentation), DayFragmentColumn("day", ColumnRepresentation.NumericColumnRepresentation), )
Unity Catalog
An example of how to use the SDK for a Unity Catalog source:
In order to create a Unity Catalog connection, before creating a connector make sure you have the following:
Personal access token. See How to create a personal access token.
Connectivity between JFrog ML environment and Unity Catalog host.
There are two distinct ways to use the Unity Catalog connector:
Providing query.
unitycatalog_source = UnityCatalogSource(
name="my_source",
description="some unity catalog data source",
date_created_column="DATE_CREATED",
uri="https://<Databricks Address>/api/2.1/unity-catalog",
catalog="unity_catalog_name",
schema="schema_name",
query="select * from table_name",
personal_access_token_secret_name="jfrogml_secret_unity_catalog_pat_token", # use secret service
)Providing table.
unitycatalog_source = UnityCatalogSource(
name="my_source",
description="some unity catalog data source",
date_created_column="DATE_CREATED",
uri="https://<Databricks Address>/api/2.1/unity-catalog",
catalog="unity_catalog_name",
schema="schema_name",
table="table_name",
personal_access_token_secret_name="jfrogml_secret_unity_catalog_pat_token", # use secret service
)Limitation: Feature sets cannot use multiple Unity Catalog data sources if they are configured to different catalogs that share the same name.