Foundations

Feature Engineering and Data Pipelines

Lesson 4 of 4 Estimated Time 45 min

Feature Engineering and Data Pipelines

Raw features are rarely optimal for ML. Feature engineering—the art of creating meaningful features—often matters more than model choice. Combined with pipelines that ensure reproducibility, feature engineering becomes a systematic process rather than trial-and-error.

Feature Selection: Removing Noise

Too many features create noise, increase training time, and hurt generalization. Feature selection identifies which features actually matter:

from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif, RFE
from sklearn.ensemble import RandomForestClassifier
import numpy as np

X, y = ...  # Your data
n_features_to_select = 10

# Method 1: Statistical feature selection
selector = SelectKBest(score_func=f_classif, k=n_features_to_select)
X_selected = selector.fit_transform(X, y)

# See which features were selected
selected_mask = selector.get_support()
selected_features = np.array([f'feature_{i}' for i in range(X.shape[1])])[selected_mask]
print(f"Selected features: {selected_features}")

# Method 2: Mutual information (captures non-linear relationships)
selector = SelectKBest(score_func=mutual_info_classif, k=n_features_to_select)
X_selected = selector.fit_transform(X, y)

# Method 3: Recursive Feature Elimination (trains model repeatedly)
rfe = RFE(RandomForestClassifier(n_estimators=100, random_state=42),
          n_features_to_select=n_features_to_select)
X_selected = rfe.fit_transform(X, y)
print(f"Selected features with RFE: {rfe.support_}")

# Method 4: Tree-based feature importance
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)

# Sort by importance
feature_importance = pd.DataFrame({
    'feature': [f'feature_{i}' for i in range(X.shape[1])],
    'importance': rf.feature_importances_
}).sort_values('importance', ascending=False)

print(feature_importance.head(10))

Creating New Features: Domain Knowledge

The best features come from understanding your domain. Some common feature engineering techniques:

import pandas as pd
import numpy as np

df = pd.DataFrame({
    'user_id': [1, 2, 3, 4, 5],
    'signup_date': pd.to_datetime(['2020-01-15', '2019-06-22', '2021-03-10', '2018-11-30', '2020-07-05']),
    'last_login': pd.to_datetime(['2023-12-20', '2023-11-15', '2023-12-01', '2023-01-10', '2023-08-30']),
    'purchase_amount': [150, 500, 75, 2000, 250],
    'num_purchases': [3, 12, 2, 45, 8]
})

# Temporal features
df['account_age_days'] = (pd.Timestamp.now() - df['signup_date']).dt.days
df['days_inactive'] = (pd.Timestamp.now() - df['last_login']).dt.days
df['signup_year'] = df['signup_date'].dt.year
df['is_active'] = df['days_inactive'] < 30

# Aggregation features
df['avg_purchase'] = df['purchase_amount'] / df['num_purchases']
df['purchase_frequency'] = df['num_purchases'] / df['account_age_days']

# Interaction features
df['high_value'] = (df['purchase_amount'] > df['purchase_amount'].quantile(0.75)).astype(int)
df['loyalty_score'] = df['num_purchases'] * df['account_age_days']

# Binning (converting continuous to categorical)
df['purchase_bucket'] = pd.cut(df['purchase_amount'],
                                bins=[0, 100, 500, 1000, float('inf')],
                                labels=['low', 'medium', 'high', 'very_high'])

print(df.head())

Handling Imbalanced Data

When one class dominates (like fraud or disease), models become biased. Several techniques address this:

from sklearn.utils.class_weight import compute_class_weight
from imblearn.over_sampling import RandomOverSampler, SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline as ImbPipeline

print(f"Class distribution: {np.bincount(y)}")

# Technique 1: Class weights (adjust penalty for minority class)
class_weights = compute_class_weight('balanced', classes=np.unique(y), y=y)
class_weight_dict = dict(zip(np.unique(y), class_weights))

from sklearn.linear_model import LogisticRegression
model = LogisticRegression(class_weight=class_weight_dict)

# Technique 2: Oversampling minority class (SMOTE)
smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X, y)

# Technique 3: Undersampling majority class
under = RandomUnderSampler(random_state=42)
X_resampled, y_resampled = under.fit_resample(X, y)

# Technique 4: Combined approach
sampler = ImbPipeline([
    ('over', SMOTE(sampling_strategy=0.5, random_state=42)),
    ('under', RandomUnderSampler(sampling_strategy=0.8, random_state=42))
])
X_resampled, y_resampled = sampler.fit_resample(X, y)

print(f"Resampled distribution: {np.bincount(y_resampled)}")

Scikit-Learn Pipelines: Avoiding Data Leakage

Pipelines chain preprocessing and modeling, ensuring the same transformations apply to train and test:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.linear_model import Ridge
from sklearn.model_selection import train_test_split, cross_val_score

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Define pipeline
pipeline = Pipeline([
    ('poly_features', PolynomialFeatures(degree=2)),
    ('scaler', StandardScaler()),
    ('model', Ridge(alpha=1.0))
])

# Fit pipeline (fitTransform on train, transform on test)
pipeline.fit(X_train, y_train)

# Evaluate
train_score = pipeline.score(X_train, y_train)
test_score = pipeline.score(X_test, y_test)

print(f"Train R²: {train_score:.4f}")
print(f"Test R²: {test_score:.4f}")

# Cross-validation on pipeline
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='r2')
print(f"CV mean: {cv_scores.mean():.4f}")

ColumnTransformer: Heterogeneous Data

Real data has mixed types. ColumnTransformer applies different transformations to different columns:

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline

# Sample data
X = pd.DataFrame({
    'age': [25, 35, 45, 28, 52],
    'income': [50000, 75000, 95000, 62000, 120000],
    'education': ['HS', 'Bachelor', 'Masters', 'Bachelor', 'PhD']
})

numeric_features = ['age', 'income']
categorical_features = ['education']

# Define transformers for each group
numeric_transformer = Pipeline(steps=[
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ('onehot', OneHotEncoder(drop='first', sparse_output=False))
])

# Combine with ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

# Full pipeline: preprocessing + model
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('model', LogisticRegression(max_iter=200))
])

# Fit entire pipeline
full_pipeline.fit(X, y)
predictions = full_pipeline.predict(X)

Custom Transformers: Reusable Preprocessing

For complex preprocessing, create custom transformers:

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline

class TextLengthTransformer(BaseEstimator, TransformerMixin):
    """Extract length features from text columns"""

    def __init__(self, columns=None):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        for col in self.columns:
            X[f'{col}_length'] = X[col].str.len()
            X[f'{col}_word_count'] = X[col].str.split().str.len()
        return X

class OutlierRemover(BaseEstimator, TransformerMixin):
    """Remove outliers beyond n standard deviations"""

    def __init__(self, n_std=3):
        self.n_std = n_std
        self.mean_ = None
        self.std_ = None

    def fit(self, X, y=None):
        self.mean_ = X.mean()
        self.std_ = X.std()
        return self

    def transform(self, X):
        lower = self.mean_ - self.n_std * self.std_
        upper = self.mean_ + self.n_std * self.std_
        return X.clip(lower=lower, upper=upper, axis=1)

# Use in pipeline
pipeline = Pipeline([
    ('outlier_removal', OutlierRemover(n_std=3)),
    ('scaler', StandardScaler()),
    ('model', RandomForestClassifier(random_state=42))
])

GridSearchCV on Pipelines

You can tune pipeline hyperparameters together:

from sklearn.model_selection import GridSearchCV

pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('model', RandomForestClassifier(random_state=42))
])

# Tune both preprocessing and model
param_grid = {
    'model__n_estimators': [50, 100, 200],
    'model__max_depth': [5, 10, None],
    'model__min_samples_split': [2, 5, 10]
}

grid_search = GridSearchCV(pipeline, param_grid, cv=5, scoring='f1', n_jobs=-1)
grid_search.fit(X_train, y_train)

print(f"Best parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")

# Use best pipeline
best_pipeline = grid_search.best_estimator_
test_score = best_pipeline.score(X_test, y_test)
print(f"Test score: {test_score:.4f}")

Key Takeaway

Feature engineering and pipelines are where the real ML work happens. Spend time crafting meaningful features and building robust pipelines that prevent data leakage. This foundation matters far more than architecture complexity.

Practical Exercise

You’re building a model to predict customer churn. You have:

  • Customer data: age, tenure, monthly_charges, num_contracts
  • Behavioral data: calls_to_support, months_since_upgrade, avg_monthly_usage
  • Categorical: contract_type, internet_service, payment_method

Your task:

  1. Create 5-10 new features from existing ones
  2. Build a ColumnTransformer that handles numeric and categorical columns
  3. Create a Pipeline with preprocessing and classification
  4. Use GridSearchCV to optimize hyperparameters
  5. Evaluate and interpret feature importance
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import GradientBoostingClassifier
import pandas as pd
import numpy as np

# Generate sample churn data
np.random.seed(42)
n_samples = 1000
df = pd.DataFrame({
    'age': np.random.randint(18, 75, n_samples),
    'tenure': np.random.randint(1, 60, n_samples),
    'monthly_charges': np.random.uniform(20, 150, n_samples),
    'num_contracts': np.random.randint(1, 5, n_samples),
    'calls_to_support': np.random.randint(0, 10, n_samples),
    'contract_type': np.random.choice(['month', 'one_year', 'two_year'], n_samples),
    'internet_service': np.random.choice(['DSL', 'Fiber', 'Cable'], n_samples)
})
y = np.random.randint(0, 2, n_samples)

# Your implementation:
# 1. Feature engineering
# 2. ColumnTransformer setup
# 3. Full Pipeline
# 4. GridSearchCV
# 5. Evaluation

# Expected output:
# - Pipeline with appropriate preprocessing
# - Hyperparameter tuning results
# - Feature importance
# - Test set performance

This exercise ties together everything: feature engineering, pipelines, and systematic hyperparameter optimization.