SynxML Python

SynxML Python provides a Python SDK for data manipulation and machine learning on SynxML. It offers two main interfaces: one for users familiar with Ray and another for general users (DataFrame).

Prerequisites

Before using SynxML Python, ensure the following resources are set up:

  • Create organization, account, and user/role via the DBaaS Admin Console.

  • Create a warehouse.

  • Create an ML cluster.

For details, see Use DBaaS Admin Console to create resources.

Prepare database

Ensure you have a database created and the synxml extension enabled.

  1. Create a database (for example, testdb) and connect to it:

    CREATE DATABASE IF NOT EXISTS testdb;
    \c testdb;
    
  2. Create and configure the extension (for example, using the synxml_auth role):

    CREATE EXTENSION IF NOT EXISTS synxml CASCADE;
    SELECT synxml.configure_auth_role('synxml_auth');
    

Enter the SynxML Python environment

To access the SynxML Python environment, you need to connect to the Kubernetes pod running the ML cluster. Follow these steps:

  1. Access the Kubernetes cluster: Log in to the Kubernetes environment where SynxDB Cloud is deployed.

  2. Identify the ML cluster pod: Run the following command to list all pods and find the one associated with your ML cluster.

    kubectl get po -A
    

    Locate the pod name (for example, mlc1--worker-d7mqd) and its namespace (for example, org1-usr2-70b2facb).

  3. Enter the pod: Use the kubectl exec command to open a bash shell in the ML cluster pod.

    kubectl exec -it <pod_name> -n <namespace> -- bash
    

    For example:

    kubectl exec -it mlc1--worker-d7mqd -n org1-usr2-70b2facb -- bash
    
  4. Start Python: after successful login, you will see the Ray container prompt. Start the Python interpreter.

    python3
    

    You should see an output similar to:

    Defaulted container "ray" out of: ray, wait-gcs-ready (init)
    (base) ray@mlc1--worker-d7mqd:~$ python3
    

    You are now in the SynxML Python environment.

Configure database

import psycopg2

# Database connection (update with your credentials)
# Replace <ip>, <port>, and <database_name> with your actual values
DB_URI = "postgresql://gpadmin@<ip>:<port>/<database_name>"


with psycopg2.connect(DB_URI) as conn:
    with conn.cursor() as cur:
        cur.execute("CREATE EXTENSION IF NOT EXISTS synxml CASCADE")
        cur.execute("SELECT synxml.configure_extension()")
        

import os

os.environ["https_proxy"] = "http://<ip>:<port>"
os.environ["http_proxy"] = "http://<ip>:<port>"

Load and preprocess data

SynxML provides powerful data loading and preprocessing capabilities.

Load data from datasets

import ray
import pandas as pd
from datasets import load_dataset
from synxml.data import read_dataset, write_dataset

print("Loading sample datasets...")

# Load Iris dataset
iris_data = load_dataset('scikit-learn/iris')
iris_train = iris_data['train']
iris_train_ds = ray.data.from_huggingface(iris_train)


# Load California Housing dataset
housing_data = load_dataset('gvlassis/california_housing')
housing_train = housing_data['train']
housing_test = housing_data['test']
housing_train_ds = ray.data.from_huggingface(housing_train)
housing_test_ds = ray.data.from_huggingface(housing_test)


# Load ChnSentiCorp dataset
sentiment_data = load_dataset('lansinuote/ChnSentiCorp')
sentiment_train = sentiment_data['train']
sentiment_test = sentiment_data['test']
sentiment_train_ds = ray.data.from_huggingface(sentiment_train)
sentiment_test_ds = ray.data.from_huggingface(sentiment_test)


# Print dataset shapes
print(f"Iris dataset shape: {iris_train_ds.count()} rows")
print(f"Housing dataset shape: {housing_train_ds.count()} rows")
print(f"Housing dataset shape: {housing_test_ds.count()} rows")
print(f"ChnSentiCorp dataset shape: {sentiment_train_ds.count()} rows")
print(f"ChnSentiCorp dataset shape: {sentiment_test_ds.count()} rows")

# Save to database
write_dataset(iris_train_ds, "iris_train", DB_URI)

write_dataset(housing_train_ds, "california_housing_train", DB_URI)
write_dataset(housing_test_ds, "california_housing_test", DB_URI)

write_dataset(sentiment_train_ds, "chnsenti_train", DB_URI)
write_dataset(sentiment_test_ds, "chnsenti_test", DB_URI)

print("Datasets saved to database!")

Load data from torchvision

import torch
from torchvision import datasets
from torchvision.transforms import ToTensor

def prepare_fashion_mnist(train=True):
    """Prepare FashionMNIST dataset"""
    data = datasets.FashionMNIST(
        root="~/data",
        train=train,
        download=True,
        transform=ToTensor()
    )

    # Convert to pandas DataFrame
    df = pd.DataFrame(list(data), columns=['X', 'y'])
    
    # Convert tensors to numpy arrays
    df['X'] = df['X'].apply(lambda r: r.to(torch.float64).numpy())
    
    return ray.data.from_pandas(df)

fashion_train = prepare_fashion_mnist(train=True)
fashion_test = prepare_fashion_mnist(train=False)

print(f"FashionMNIST train: {fashion_train.count()} samples")
print(f"FashionMNIST test: {fashion_test.count()} samples")

write_dataset(fashion_train, 'fashionmnist_train', DB_URI)
write_dataset(fashion_test, 'fashionmnist_test', DB_URI)

print("FashionMNIST datasets saved!")
import torch
from torchvision import datasets
from torchvision.transforms import ToTensor
import pandas as pd
import numpy as np
import io
from PIL import Image


import ray
import pandas as pd
from datasets import load_dataset
from synxml.data import read_dataset, write_dataset


def prepare_fashion_mnist_bytes(train=True):
    data = datasets.FashionMNIST(
        root="~/data",
        train=train,
        download=True,
        transform=ToTensor()
    )

    # Convert to pandas DataFrame
    df = pd.DataFrame(list(data), columns=['X', 'y'])
    
    # Transform tensor to bytes
    def tensor_to_bytes(tensor):
        img_array = (tensor.squeeze().numpy() * 255).astype(np.uint8)
        img = Image.fromarray(img_array)
        img_bytes = io.BytesIO()
        img.save(img_bytes, format='PNG')
        return img_bytes.getvalue()
    
    df['X'] = df['X'].apply(tensor_to_bytes)
    
    return ray.data.from_pandas(df)

fashion_train = prepare_fashion_mnist_bytes(train=True)
fashion_test = prepare_fashion_mnist_bytes(train=False)

print(f"FashionMNIST train: {fashion_train.count()} samples")
print(f"FashionMNIST test: {fashion_test.count()} samples")

# Write train and test to db 
write_dataset(fashion_train, 'fashionmnist_train_bytes', DB_URI)
write_dataset(fashion_test, 'fashionmnist_test_bytes', DB_URI)

# Sample 1% from fashion_train
fashion_sample = fashion_train.random_sample(fraction=0.01, seed=42)
print(f"FashionMNIST sample: {fashion_sample.count()} samples")
# Write sample to db
write_dataset(fashion_sample, 'fashionmnist_sample_bytes', DB_URI)

Transform and preprocess data

iris_dataset = read_dataset('iris_train', DB_URI)

print("Original Iris dataset:")
iris_dataset.show(3)

# Define transformation function
def transform_iris(batch):
    """Transform Iris dataset"""
    # Map species to numeric values
    species_map = {
        'Iris-setosa': 0,
        'Iris-versicolor': 1,
        'Iris-virginica': 2
    }
    batch['Species_encoded'] = batch['Species'].map(species_map)
    
    # Create new features
    batch['Sepal_ratio'] = batch['SepalLengthCm'] / batch['SepalWidthCm']
    batch['Petal_ratio'] = batch['PetalLengthCm'] / batch['PetalWidthCm']
    
    # Drop original Species column
    batch = batch.drop(columns=['Species'])
    
    return batch

# Apply transformation
iris_transformed = iris_dataset.map_batches(transform_iris, batch_format="pandas")

print("\nTransformed Iris dataset:")
iris_transformed.show(3)

# Save transformed data
write_dataset(iris_transformed, "iris_processed", DB_URI)

print("Transformed data saved!")

Use traditional ML models

SynxML provides a comprehensive suite of traditional machine learning models through sklearn integration.

Support vector machine (SVM)

  • An example of training:

    from synxml.models import SVC
    
    svc = SVC(
        C=1.0,
        kernel='rbf',
        gamma='scale',
        probability=True,  # Enable probability estimates
        random_state=42
    )
    
    print("Training SVM on Iris dataset...")
    svc.fit(
        train_tblname='iris_processed',
        model_name='iris_svm_classifier',
        train_config={
            'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'y': 'Species_encoded',
            'sample_weight': None
        },
        db_uri=DB_URI
    )
    
    print("SVM training completed!")
    print(f"Model saved as: iris_svm_classifier")
    
    print(f"Model Info: {svc.model}")
    
  • An example of prediction:

    from synxml.models import SVC
    
    svc = SVC.from_model_name('iris_svm_classifier', db_uri_models=DB_URI)
    
    print("Making predictions with SVC...")
    svc.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_svm_predictions',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    

Use logistic regression

  • An example of training:

    from synxml.models import LogisticRegression
    
    logreg = LogisticRegression(
        penalty='l2',
        C=1.0,
        solver='lbfgs',
        multi_class='auto',
        max_iter=1000,
        random_state=42
    )
    
    print("Training Logistic Regression on Iris dataset...")
    logreg.fit(
        train_tblname='iris_processed',
        model_name='iris_logreg_classifier',
        train_config={
            'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'y': 'Species_encoded',
            'sample_weight': None
        },
        db_uri=DB_URI
    )
    
    print("Logistic Regression training completed!")
    print(f"Model saved as: iris_logreg_classifier")
    
    print(f"Model Info: {logreg.model}")
    
  • An example of prediction:

    from synxml.models import LogisticRegression
    
    logreg = LogisticRegression.from_model_name('iris_logreg_classifier', db_uri_models=DB_URI)
    
    print("Making predictions with LogisticRegression...")
    logreg.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_logreg_predictions',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    

Use AdaBoost classifier

  • An example of training:

    from synxml.models import AdaBoostClassifier
    
    adaboost = AdaBoostClassifier(
        n_estimators=50,
        learning_rate=1.0,
        algorithm='SAMME',
        random_state=42
    )
    
    print("Training AdaBoost on Iris dataset...")
    adaboost.fit(
        train_tblname='iris_processed',
        model_name='iris_adaboost_classifier',
        train_config={
            'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'y': 'Species_encoded',
            'sample_weight': None
        },
        db_uri=DB_URI
    )
    
    print("AdaBoost training completed!")
    print(f"Model saved as: iris_adaboost_classifier")
    
    print(f"Model Info: {adaboost.model}")
    
  • An example of prediction:

    from synxml.models import AdaBoostClassifier
    
    adaboost = LogisticRegression.from_model_name('iris_adaboost_classifier', db_uri_models=DB_URI)
    
    print("Making predictions with LogisticRegression...")
    adaboost.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_adaboost_predictions',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    

Use random forest classifier

  • An example of training:

    from synxml.models import RandomForestClassifier
    
    rfc = RandomForestClassifier(
        n_estimators=100,
        max_depth=10,
        random_state=42
    )
    
    print("Training Random Forest classifier...")
    rfc.fit(
        train_tblname='iris_processed',
        model_name='iris_rf_classifier',
        train_config={
            'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'y': 'Species_encoded',
            'num_workers': 4,
            'use_gpu': False,
        },
        db_uri=DB_URI
    )
    
    print("Random Forest training completed!")
    print(f"Model saved as: iris_rf_classifier")
    print(f"Model Info: {rfc.model}")
    
  • An example of prediction:

    from synxml.models import RandomForestClassifier
    
    rfc = RandomForestClassifier.from_model_name('iris_rf_classifier', db_uri_models=DB_URI)
    
    print("Making predictions with Random Forest...")
    rfc.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_rf_predictions',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: iris_rf_predictions")
    

Use KMeans clustering

  • An example of training:

    from synxml.models import KMeans
    
    kms = KMeans(n_clusters=3, init='k-means++', algorithm='lloyd')
    #kms = KMeans()
    kms.fit(
        train_tblname='iris_processed',
        modelname='kms_iris',
        train_config={
            #'y': None,
            'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'num_workers':1,
            'use_gpu':False,
            'scoring': 'davies_bouldin',
        },
        db_uri=DB_URI
    )
    
  • An example of prediction:

    from synxml.models import KMeans
    
    rfc = KMeans.from_model_name('lloyd_a488af3bf68d4a578030ccb0cfd792e8', db_uri_models=DB_URI)
    
    rfc.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_kms_predictions',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio', 'Species_encoded'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: iris_kms_predictions")
    

Use DBSCAN clustering

An example of training and prediction:

from synxml.models import DBSCAN

dbs = DBSCAN()
dbs.fit(
    train_tblname='iris_processed',
    modelname='kms_iris',
    train_config={
        #'y': None,
        'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
        'num_workers':1,
        'use_gpu':False,
        'scoring': 'davies_bouldin',
    },
    db_uri=DB_URI
)

Use MeanShift clustering

  • An example of training:

    from synxml.models import MeanShift
    
    msf = MeanShift()
    msf.fit(
        train_tblname='iris_processed',
        modelname='msf_iris',
        train_config={
            'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'num_workers':1,
            'use_gpu':False,
        },
        db_uri=DB_URI
    )
    
  • An example of prediction:

    from synxml.models import MeanShift
    
    msf = MeanShift.from_model_name('MeanShift_80d6ee0647c64c94b98ee81ed066409c', db_uri_models=DB_URI)
    
    msf.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_msf_predictions',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio', 'Species_encoded'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: iris_msf_predictions")
    

Use SpectralClustering clustering

  • An example of training:

    from synxml.models import SpectralClustering
    
    opt = SpectralClustering()
    opt.fit(
        train_tblname='iris_processed',
        modelname='opt_iris',
        train_config={
            'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'num_workers':1,
            'use_gpu':False,
        },
        db_uri=DB_URI
    )
    
  • An example of prediction:

    from synxml.models import SpectralClustering
    
    opt = SpectralClustering.from_model_name('SpectralClustering_80d6ee0647c64c94b98ee81ed066409c', db_uri_models=DB_URI)
    
    opt.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_opt_predictions',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio', 'Species_encoded'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: iris_opt_predictions")
    
    
    import ray
    
    ray.data.DataContext.get_current().use_ray_tqdm = False
    DB_URI = "postgresql://<user>@<ip>:<port>/<db_name>"
    

Use SpectralClustering clustering

  • An example of training:

    from synxml.models import SpectralClustering
    
    opt = SpectralClustering()
    opt.fit(
        train_tblname='iris_processed',
        modelname='opt_iris',
        train_config={
            'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'num_workers':1,
            'use_gpu':False,
        },
        db_uri=DB_URI
    )
    
  • An example of prediction:

    from synxml.models import SpectralClustering
    
    spc = SpectralClustering.from_model_name('SpectralClustering_4cf9d2971ca7413f8c015085af563158', db_uri_models=DB_URI)
    
    spc.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_spc_predictions',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio', 'Species_encoded'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: iris_spc_predictions")
    

Use XGBoost classification

  • An example of training:

    from synxml.models import XGBoost
    
    # Initialize XGBoost regressor
    xgb = XGBoost()
    
    # Train on California Housing data
    print("Training XGBoost regressor on California Housing data...")
    xgb.fit(
        train_tblname='iris_processed',
        model_name='iris_xgb_classifier',
        train_config={
            'objective': 'multi:softmax',
            'num_class': 3,
            'num_boost_round': 100,
            'early_stopping_rounds': 10,
            'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'y': 'Species_encoded',
            'num_workers': 1,
            'use_gpu': False,
        },
        db_uri=DB_URI
    )
    
    print("XGBoost training completed!")
    print(f"Model saved as: iris_xgb_classifier")
    
    print(f"Model Info: {xgb.model.attributes()}")
    
  • An example of prediction:

    from synxml.models import XGBoost
    
    xgb = XGBoost.from_model_name('iris_xgb_classifier', db_uri_models=DB_URI)
    
    print("Making predictions with Random Forest...")
    xgb.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_xgb_predictions',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: iris_xgb_predictions")
    

Use XGBoost regression

  • An example of training:

    from synxml.models import XGBoost
    
    xgb = XGBoost()
    
    print("Training XGBoost regressor on California Housing data...")
    xgb.fit(
        train_tblname='california_housing_train',
        model_name='california_housing_xgb_regressor',
        train_config={
            'objective': 'reg:squarederror',
            'y': 'MedHouseVal',
            'num_boost_round': 100,
            'early_stopping_rounds': 10,
            'num_workers': 1
        },
        db_uri=DB_URI,
        valid_tblname='california_housing_test'
    )
    
    print("XGBoost training completed!")
    print(f"Model saved as: california_housing_xgb")
    
    print(f"Model Info: {xgb.model.attributes()}")
    
  • An example of prediction:

    from synxml.models import XGBoost
    
    xgb = XGBoost.from_model_name('california_housing_xgb_regressor', db_uri_models=DB_URI)
    
    print("Making predictions with Random Forest...")
    xgb.batch_predict(
        input_tblname='california_housing_test',
        output_tblname='california_housing_xgb_predictions',
        predict_config={
            'y': 'MedHouseVal'
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: california_housing_xgb_predictions")
    

Use LightGBM classification

  • An example of training:

    from synxml.models import LightGBM
    
    lgb = LightGBM()
    
    print("Training LightGBM classifier...")
    lgb.fit(
        train_tblname='iris_processed',
        model_name='iris_lgb_classifier',
        train_config={
            'objective': 'multiclass',
            'num_class': 3,
            'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'y': 'Species_encoded',
            'num_iterations': 100,
            'learning_rate': 0.1,
            'num_workers': 2
        },
        db_uri=DB_URI
    )
    
    print("LightGBM training completed!")
    print(f"Model saved as: iris_lgb_classifier")
    
    print(f"Model Info: {lgb.model}")
    
  • An example of prediction:

    from synxml.models import LightGBM
    
    lgb = LightGBM.from_model_name('iris_lgb_classifier', db_uri_models=DB_URI)
    
    print("Making predictions with LightGBM...")
    lgb.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_lgb_predictions',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: iris_lgb_predictions")
    

Use Catboost classification

  • An example of training:

    DB_URI = "postgresql://[email protected]:7000/synxml_demo"
    import ray
    
    ray.data.DataContext.get_current().use_ray_tqdm = False
    
    from synxml.models import CatBoost
    
    ctb = CatBoost()
    
    print("Training Catboost classifier...")
    
    
    ctb.fit(
        train_tblname='iris_processed',
        model_name='iris_ctb_classifier',
        train_config={
            'iterations': 100,
            'learning_rate': 0.01,
            'loss_function': 'MultiClass',
            'depth': 3,
            'use_gpu': False,
            'X': ['Id', 'SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
            'y': 'Species_encoded',
            'cat_features': ['Id']
        },
        db_uri=DB_URI
    )
    
    print("CatBoostClassifier training completed!")
    print(f"Model saved as: iris_ctb_classifier")
    
    print(f"Model Info: {ctb.model}")
    
  • An example of prediction:

    DB_URI = "postgresql://[email protected]:7000/synxml_demo"
    import ray
    
    ray.data.DataContext.get_current().use_ray_tqdm = False
    
    from synxml.models import CatBoost
    
    ctb = CatBoost.from_model_name('iris_ctb_classifier', db_uri_models=DB_URI)
    
    print("Making predictions with CatBoostClassifier...")
    ctb.batch_predict(
        input_tblname='iris_processed',
        output_tblname='iris_ctb_predictions11',
        predict_config={
            'y': 'Species_encoded',
            'drop_columns': ['Petal_ratio', 'Sepal_ratio'],
            'keep_columns': ['Id']
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: iris_ctb_predictions")
    

Use deep learning models

SynxML supports PyTorch-based deep learning models for complex tasks.

Use multi-layer perceptron (MLP)

  • An example of training:

    from synxml.models import MLP
    
    mlp = MLP(
        input_size=784,          # 28x28 flattened images
        hidden_config=[256, 128], # Two hidden layers
        output_size=10,          # 10 fashion categories
        activation='relu',
        dropout=0.2
    )
    
    print("Training MLP on FashionMNIST...")
    mlp.fit(
        train_tblname='fashionmnist_train',
        model_name='fashion_mlp_classifier',
        train_config={
            'X': 'X',
            'y': 'y',
            'num_epochs': 5,
            'num_workers': 4,
            'use_gpu': False,
            'per_device_batch_size': 64,
            'learning_rate': 0.001
        },
        db_uri=DB_URI
    )
    
    print("MLP training completed!")
    print(f"Model saved as: fashion_mlp_classifier")
    
  • An example of prediction:

    from synxml.models import MLP
    
    mlp = MLP.from_model_name('fashion_mlp_classifier', db_uri_models=DB_URI)
    
    print("Making predictions with MLP...")
    mlp.batch_predict(
        input_tblname='fashionmnist_test',
        output_tblname='fashion_mlp_classifier',
        predict_config={
            'y': 'y',
            "compute_label": True
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: fashion_mlp_classifier")
    

Use convolutional neural network (ResNet)

  • An example of training:

    from synxml.models import ResNet
    
    # Initialize model
    resnet = ResNet(
        pretrained_model_name="resnet-50",
    )
    
    # Training config
    train_config = {
        'X': 'X',
        'y': 'y', 
        'num_class': 10,
        'num_train_epochs': 1,
        'learning_rate': 2e-5,
        'per_device_train_batch_size': 8,
        'use_gpu': False
    }
    
    # Start training
    resnet.fit(
        train_tblname='fashionmnist_sample_bytes',
        model_name='fashion_resnet_classifier',
        train_config=train_config,
        valid_tblname='fashionmnist_sample_bytes',
        db_uri=DB_URI
    )
    
  • An example of prediction:

    from synxml.models import ResNet
    
    reset = ResNet.from_model_name('fashion_resnet_classifier', db_uri_models=DB_URI)
    
    print("Making predictions with ResNet...")
    reset.batch_predict(
        input_tblname='fashionmnist_sample_bytes',
        output_tblname='fashion_resnet_classifier1',
        predict_config={
            'y': 'y',
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: fashion_resnet_classifier")
    

Use bidirectional encoder representations from transformers (BERT)

  • An example of training:

    from synxml.models import Bert
    
    # Initialize model
    bert = Bert(
        pretrained_model_name="bert-base-chinese",
    )
    
    # Training config
    train_config = {
        'X': 'review',
        'y': 'label', 
        'num_class': 2,
        'num_train_epochs': 1,
        'learning_rate': 2e-5,
        'per_device_train_batch_size': 32,
        'use_gpu': False
    }
    
    
    bert.fit(
        train_tblname='waimai_mini_train',
        model_name='waimai_bert_classifier',
        train_config=train_config,
        valid_tblname='waimai_mini_val',
        db_uri=DB_URI
    )
    
  • An example of prediction:

    from synxml.models import Bert
    
    reset = Bert.from_model_name('waimai_bert_classifier', db_uri_models=DB_URI)
    
    print("Making predictions with Bert...")
    reset.batch_predict(
        input_tblname='waimai_mini_val',
        output_tblname='waimai_mini_val_pred',
        predict_config={
            'y': 'label',
            'X': 'review'
        },
        db_uri=DB_URI
    )
    
    print("Predictions completed!")
    print("Results saved to: waimai_mini_val_pred")
    

Use timeseries modeling: RNNBlockRegressor

  • An example of training:

    from synxml.timeseries.models import RNNBlockRegressor
    
    rnn = RNNBlockRegressor(
        in_chunk_len = 96,
        out_chunk_len = 24,
        rnn_type_or_module = "RNN",
        hidden_size = 128,
        embedding_size = 64,
        num_layers_recurrent = 1
    )
    
    rnn.fit(
        train_tblname="power_train", 
        train_config = {
            "num_epochs": 10,
            "num_worker": 10,
            "use_gpu": False,
            "batch_size": 32,
            "metrics": ["mse", "mae"]
        },
        variable_config = { # target variables and covariate variables supported
            "time_col": "timestamp", 
            "target_cols": ["power"],
            "observed_cov_cols": ["voltage"],
            "known_cov_cols": ["minute", "hour"]
        },
        valid_tblname="power_val",
        db_uri=DB_URI
    )
    
  • An example of forecasting:

    from synxml.timeseries.models import RNNBlockRegressor
    rnn = RNNBlockRegressor.from_model_name(
        "RNNBlockRegressor_18c997e6716b4f56bfebd4f7061a2c46",  # replace with your model name
        db_uri_models=DB_URI
    )
    
    pred_scaled = rnn.predict(input_tblname="power_val", output_tblname="power_predictions", db_uri=DB_URI)
    print("Forecasting completed.")
    print(pred_scaled)
    

Use timeseries modeling: DLinearRegressor

  • An example of training:

    from synxml.timeseries.models import DLinearRegressor
    
    dlinear = DLinearRegressor(
        in_chunk_len = 96,
        out_chunk_len = 24,
        individual = False,
        kernel_size = 25
    )
    
    dlinear.fit(
        train_tblname="power_train", 
        train_config = {
            "num_epochs": 10,
            "num_worker": 10,
            "use_gpu": False,
            "batch_size": 32,
            "metrics": ["mse", "mae"]
        },
        variable_config = {
            "time_col": "timestamp", 
            "target_cols": ["power"], # only target variables supported
        },
        valid_tblname="power_val",
        db_uri=DB_URI
    )
    
  • An example of forecasting:

    from synxml.timeseries.models import DLinearRegressor
    dlinear = DLinearRegressor.from_model_name(
        "DLinearRegressor_d4803486a5a542e293f6d075226037fd",  # replace with your model name
        db_uri_models=DB_URI
    )
    
    pred_scaled = dlinear.predict(input_tblname="power_val", output_tblname="power_predictions", db_uri=DB_URI)
    print("Forecasting completed.")
    print(pred_scaled)
    

Use AutoML capabilities

includes powerful AutoML features for automated model selection and hyperparameter tuning.

Use cross-validation

from synxml.models import RandomForestClassifier, cross_validate

# Initialize model for cross-validation
rf_cv_model = RandomForestClassifier()

# Configure cross-validation
cv_config = {
    'y': 'Species_encoded',
    'cv': 5,                    # 5-fold cross-validation
    'scoring': ['accuracy', 'f1_macro', 'precision'],
    'n_jobs': -1                # Use all available cores
}

print("Performing 5-fold cross-validation...")
cv_results = cross_validate(
    model=rf_cv_model,
    train_tblname='iris_processed',
    db_uri=DB_URI,
    train_config=cv_config
)

print("Cross-validation results:\n", cv_results)

Use hyperparameter tuning

from synxml.models import XGBoost, param_tune

# Initialize model for tuning
xgb_tune_model = XGBoost()

# Define parameter grid
tune_config = {
    'y': 'Species_encoded',
    'cv': 3,
    'scoring': 'accuracy',
    'n_jobs': -1,
    'param_grid': {
        'n_estimators': [100, 200],
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'subsample': [0.8, 1.0]
    }
}

print("Starting hyperparameter tuning...")
best_params = param_tune(
    model=xgb_tune_model,
    train_tblname='iris_processed',
    db_uri=DB_URI,
    train_config=tune_config
)

print("\nBest parameters found:")
for param, value in best_params.items():
    print(f"{param}: {value}")

Use Flow - Data processing pipelines

Flow provides a powerful pipeline system for building complex data processing workflows.

DB_URI = "postgresql://<user>@<ip>:<port>/<db_name>"

Use text processing workflow

from synxml.flow.pipeline.workflow import *
from synxml.flow.pipeline.pipeline import Pipeline

# 1. Define the nodes of the workflow
reader = DataSourceNode(
    id="reader_text",
    table_type=TableType.DIRTABLE,
    table_name="dirtable_pdf", # Replace with your dirtable name
    db_uri=DB_URI,
    file_types=["pdf"]
)

parser = FileParserNode(
    id="parser_pdf",
    file_types=FileType.PDF,
)

splitter = SplitterNode(
    id="splitter_text",
    max_length=1024,
    level=1,
)

writer = DataSinkNode(
    id="writer_summary",
    table_type=TableType.TABLE,
    table_name="summarization_results_quickstart",
    db_uri=DB_URI,
)

# 2. Assemble the workflow
text_workflow = WorkFlow(
    id="text_summarization_workflow",
    name="text_summarization_pipeline",
    nodes=[reader, parser, splitter, writer],
    dependencies = {
        "reader_text": [],
        "parser_pdf": ["reader_text"],
        "splitter_text": ["parser_pdf"],
        "writer_summary": ["splitter_text"]
    },
    db_role = "gpadmin"
)

# 3. Validate the workflow
workflow_json = text_workflow.model_dump_json()
text_workflow = WorkFlow.model_validate_json(workflow_json)

text_workflow.validate_dependencies()
text_workflow.validate_parallel_structure()

print("Text workflow created and validated.")

# 4. Build and run the pipeline (optional)
# To run the pipeline, uncomment the following lines.
# This requires access to the specified data and services.
pipe = Pipeline.build_from_workflow(text_workflow)
ds = pipe.run()
ds.show()

Use image processing workflow

from synxml.flow.pipeline.workflow import *
from synxml.flow.pipeline.pipeline import Pipeline

# 1. Define nodes
vision_reader = DataSourceNode(
    id="reader_vision",
    table_type=TableType.DIRTABLE,
    table_name="test_vision", # Replace with your dirtable name
    db_uri=DB_URI,
    file_types="jpg"
)

vision_parser = FileParserNode(
    id="parser_jpg",
    file_types=FileType.JPG,
    columns_mapping = {
        "input_cols": {"content": ("content", "str")},
        "output_cols": {"base64": ("base64", "str")}
    },
    extract_reserved_columns = ["file_type"]
)

# NOTE: You need a running vision service for this node.
vision_detector = VisionNode(
    id="vision_detector",
    service_type="local",
    task_type="detect",
    end_point="http://localhost:8000/yolo/v1/detect", # Replace with your endpoint
    columns_mapping = {
        "input_cols": {"base64": ("base64", "str")},
        "output_cols": {"vision_output": ("vision_output", "str")}
    }
)

vision_writer = DataSinkNode(
    id="writer_vision",
    table_type=TableType.TABLE,
    table_name="vision_results_quickstart", # Replace with your table name
    db_uri=DB_URI,
)

# 2. Assemble workflow
vision_workflow = WorkFlow(
    id="vision_workflow",
    name="vision_pipeline",
    nodes=[vision_reader, vision_parser, vision_detector, vision_writer]
)

# 3. Validate
workflow_json = vision_workflow.model_dump_json()
vision_workflow = WorkFlow.model_validate_json(workflow_json)

vision_workflow.validate_dependencies()
vision_workflow.validate_parallel_structure()

print("Vision workflow created and validated.")

# 4. Build and run (optional)
#from synxml.flow.pipeline.pipeline import Pipeline
pipe = Pipeline.build_from_workflow(vision_workflow)
ds = pipe.run()
ds.show()

Use speech recognition workflow

from synxml.flow.pipeline.workflow import *
from synxml.flow.pipeline.pipeline import Pipeline

# 1. Define nodes
speech_reader = DataSourceNode(
    id="reader_speech",
    table_type=TableType.DIRTABLE,
    table_name="test_dir1", # Replace with your dirtable name
    db_uri=DB_URI,
    file_types=["mp3"]
)

speech_parser = FileParserNode(
    id="parser_mp3",
    file_types=FileType.MP3,
    columns_mapping = {
        "input_cols": {"content": ("content", "str")},
        "output_cols": {"audio": ("audio_base64", "str")}
    },
    extract_reserved_columns = ["file_types"]

)

# NOTE: You need a running speech recognition service for this node.
speech_recognizer = SpeechRecognitionNode(
    id="recognizer_speech",
    service_type=SpeechRecognitionServiceType.API,
    model_name="SenseVoiceSmall",
    end_point="http://10.14.10.1:8000/sensevoice/v1/asr", # Replace with your endpoint
    columns_mapping = {
        "input_cols": {"audio": ("audio_base64", "str")},
        "output_cols": {"text": ("text", "str")}
    },

)

speech_writer = DataSinkNode(
    id="writer_speech",
    table_type=TableType.TABLE,
    table_name="speech_results_quickstart",
    db_uri=DB_URI,
)

# 2. Assemble workflow
speech_workflow = WorkFlow(
    id="speech_workflow",
    name="speech_pipeline",
    nodes=[speech_reader, speech_parser, speech_recognizer, speech_writer],
    dependencies = {
        "reader_speech": [],
        "parser_mp3": ["reader_speech"],
        "recognizer_speech": ["parser_mp3"],
        "writer_speech": ["recognizer_speech"]
    },
    db_role = "gpadmin"
)

# 3. Validate
workflow_json = speech_workflow.model_dump_json()
speech_workflow = WorkFlow.model_validate_json(workflow_json)

speech_workflow.validate_dependencies()
speech_workflow.validate_parallel_structure()


print("Speech workflow created and validated.")

# 4. Build and run (optional)
# from synxml.flow.pipeline.pipeline import Pipeline
pipe = Pipeline.build_from_workflow(speech_workflow)
ds = pipe.run()
ds.show()

Use RAG

includes features for building RAG applications.

Use text embeddings

from synxml.reqa import TextEmbedder

embedder = TextEmbedder(
    "/tmp/models/jina-embeddings-v2-base-zh", # replace to your model path
    max_seq_length=512,
    use_gpu=False
)

embedder.embed("Hi")

Use multi-modal embeddings

from synxml.reqa import MultimodalEmbedder

multi_embedder = MultimodalEmbedder(
    "/tmp/models/bge-vl-base", # replace to your model path
    use_gpu=False
)

multi_embedder.embed([{"text": "Hi", "image": "iVBORw0KGgoAAAANSUhEUgAAAAoAAAAKCAIAAAACUFjqAAAAE0lEQVR4nGP8z4APMOGVZRip0gBBLAETee26JgAAAABJRU5ErkJggg=="}])

Use ReRanking

from synxml.reqa import ReRanker

reranker = ReRanker(
    "/tmp/models/jina-embeddings-v2-base-zh", # replace to your model path
    max_seq_length=512,
    use_gpu=False
)

reranker.rank("Hi", ["Hello", "Hi", "Goodbye"])

Use vector knowledge base

from synxml.reqa import DocVKB
from synxml.client import EmbeddingAPI
embedder = EmbeddingAPI(
    "http://10.14.10.1:8000/embedding/v1/embeddings",  # replace with your own endpoint
    "jina-embeddings-v2-base-zh"  # replace with your own model name
    )

vkb = DocVKB(
    DB_URI,
    kb_table_name="kb_0fa280128054413aaf47adcc0f5cb8ff",  # replace with your own table name
    embedder=embedder,
)

vkb.embedding_search("Hi")

Best practices and tips

Follow these best practices for optimal results with .

Use data management best practices

  • ✓ Always validate data quality before training.

  • ✓ Use appropriate data types for columns.

  • ✓ Implement proper train/test splits.

  • ✓ Handle missing values appropriately.

  • ✓ Normalize/scale numerical features.

  • ✓ Encode categorical variables properly.

  • ✓ Monitor data drift in production.

  • ✓ Use version control for datasets.

  • ✓ Document data sources and transformations.

  • ✓ Implement data privacy and security measures.

Model training tips

  • ✓ Start with simple models before complex ones.

  • ✓ Use cross-validation for robust evaluation.

  • ✓ Tune hyperparameters systematically.

  • ✓ Monitor for overfitting and underfitting.

  • ✓ Use appropriate evaluation metrics.

  • ✓ Validate model assumptions.

  • ✓ Consider ensemble methods.

  • ✓ Document model configurations.

  • ✓ Save model artifacts properly

  • ✓ Test model inference performance