Text Classification using NLP

Building an NLP Pipeline: Text Classification

Objective

Build a complete Natural Language Processing (NLP) pipeline for text classification, learning both traditional and modern deep learning approaches. You’ll convert text data into numerical representations using embeddings and implement classification models to categorize text effectively.


Learning Outcomes

By completing this project, you will:

  • Master fundamental NLP concepts and text processing techniques
  • Understand and implement different text embedding methods
  • Build text classification models using both traditional ML and deep learning
  • Evaluate and optimize NLP model performance
  • Gain practical experience with industry-standard NLP tools and libraries

Skills Gained

  • Building end-to-end NLP pipelines
  • Implementing modern text embedding techniques
  • Creating text classification systems
  • Using state-of-the-art NLP libraries and models
  • Evaluating and optimizing NLP models
  • Handling real-world text data challenges

Tools Required

# Core libraries
pip install transformers
pip install torch torchvision
pip install scikit-learn
pip install nltk
pip install gensim

# Text processing
pip install spacy
pip install textblob

# Optional: for OpenAI embeddings
pip install openai

Project Structure

text_classification/
│
├── data/
│   ├── raw/
│   │   ├── train.csv
│   │   └── test.csv
│   └── processed/
│
├── src/
│   ├── preprocessing.py
│   ├── embeddings/
│   │   ├── traditional.py
│   │   ├── transformer.py
│   │   └── openai.py
│   ├── models.py
│   └── evaluation.py
│
└── notebooks/
    ├── 1_data_exploration.ipynb
    ├── 2_embeddings.ipynb
    ├── 3_model_training.ipynb
    └── 4_evaluation.ipynb

Steps and Tasks

1. Data Preprocessing

First, let’s implement a robust text preprocessing pipeline:

import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import re

class TextPreprocessor:
    def __init__(self):
        nltk.download('punkt')
        nltk.download('stopwords')
        nltk.download('wordnet')
        self.lemmatizer = WordNetLemmatizer()
        self.stop_words = set(stopwords.words('english'))
        
    def clean_text(self, text):
        """Basic text cleaning"""
        # Convert to lowercase
        text = text.lower()
        
        # Remove special characters and digits
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        
        # Tokenization
        tokens = word_tokenize(text)
        
        # Remove stopwords and lemmatize
        tokens = [self.lemmatizer.lemmatize(token) 
                 for token in tokens 
                 if token not in self.stop_words]
        
        return ' '.join(tokens)
Click to view advanced preprocessing
class AdvancedTextPreprocessor:
    def __init__(self):
        self.basic_preprocessor = TextPreprocessor()
        self.nlp = spacy.load('en_core_web_sm')
        
    def extract_entities(self, text):
        """Extract named entities"""
        doc = self.nlp(text)
        entities = [(ent.text, ent.label_) for ent in doc.ents]
        return entities
    
    def get_pos_tags(self, text):
        """Get POS tags"""
        doc = self.nlp(text)
        return [(token.text, token.pos_) for token in doc]
    
    def clean_and_augment(self, text):
        """Clean text and add linguistic features"""
        # Basic cleaning
        clean_text = self.basic_preprocessor.clean_text(text)
        
        # Get entities and POS tags
        entities = self.extract_entities(text)
        pos_tags = self.get_pos_tags(text)
        
        return {
            'clean_text': clean_text,
            'entities': entities,
            'pos_tags': pos_tags
        }

2. Text Embeddings

Implement different embedding approaches:

from transformers import AutoTokenizer, AutoModel
import torch

class TextEmbedder:
    def __init__(self, model_name='bert-base-uncased'):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        
    def get_bert_embeddings(self, texts):
        """Get BERT embeddings for texts"""
        # Tokenize
        encoded = self.tokenizer(
            texts,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors='pt'
        )
        
        # Get embeddings
        with torch.no_grad():
            outputs = self.model(**encoded)
            embeddings = outputs.last_hidden_state[:, 0, :]
            
        return embeddings.numpy()
Click to view additional embedding methods
class MultiEmbedder:
    def __init__(self):
        self.bert_embedder = TextEmbedder()
        
    def get_word2vec_embeddings(self, texts, vector_size=100):
        """Get Word2Vec embeddings"""
        # Train Word2Vec model
        tokenized_texts = [text.split() for text in texts]
        w2v_model = Word2Vec(
            sentences=tokenized_texts,
            vector_size=vector_size,
            window=5,
            min_count=1
        )
        
        # Get sentence embeddings by averaging word vectors
        embeddings = []
        for text in tokenized_texts:
            vectors = [w2v_model.wv[word] for word in text if word in w2v_model.wv]
            if vectors:
                embeddings.append(np.mean(vectors, axis=0))
            else:
                embeddings.append(np.zeros(vector_size))
                
        return np.array(embeddings)
    
    def get_openai_embeddings(self, texts):
        """Get OpenAI embeddings"""
        import openai
        
        embeddings = []
        for text in texts:
            response = openai.Embedding.create(
                input=text,
                model="text-embedding-ada-002"
            )
            embeddings.append(response['data'][0]['embedding'])
            
        return np.array(embeddings)

3. Model Implementation

Create text classification models:

from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression

class TextClassifier:
    def __init__(self, model_type='rf'):
        if model_type == 'rf':
            self.model = RandomForestClassifier(
                n_estimators=100,
                max_depth=None,
                min_samples_split=2,
                random_state=42
            )
        else:
            self.model = LogisticRegression(
                max_iter=1000,
                random_state=42
            )
    
    def train(self, X_train, y_train):
        """Train the classifier"""
        self.model.fit(X_train, y_train)
        
    def predict(self, X):
        """Make predictions"""
        return self.model.predict(X)
Click to view deep learning model implementation
class DeepTextClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim // 2, num_classes)
        )
        
    def forward(self, x):
        return self.layers(x)
    
class TextClassificationTrainer:
    def __init__(self, model, device='cuda'):
        self.model = model
        self.device = device
        self.model.to(device)
        
    def train(self, train_loader, val_loader, epochs=10):
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.model.parameters())
        
        for epoch in range(epochs):
            self.model.train()
            train_loss = 0
            for batch in train_loader:
                optimizer.zero_grad()
                inputs, labels = batch
                inputs = inputs.to(self.device)
                labels = labels.to(self.device)
                
                outputs = self.model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                
                train_loss += loss.item()
            
            # Validation
            val_loss, val_acc = self.evaluate(val_loader)
            print(f'Epoch {epoch+1}: Train Loss = {train_loss/len(train_loader):.4f}, '
                  f'Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}')

4. Evaluation

Implement comprehensive evaluation metrics:

from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt

class ModelEvaluator:
    def __init__(self):
        self.metrics = {}
        
    def evaluate(self, y_true, y_pred):
        """Calculate classification metrics"""
        self.metrics['accuracy'] = accuracy_score(y_true, y_pred)
        self.metrics['report'] = classification_report(y_true, y_pred)
        
        return self.metrics
    
    def plot_confusion_matrix(self, y_true, y_pred, classes):
        """Plot confusion matrix"""
        cm = confusion_matrix(y_true, y_pred)
        
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                   xticklabels=classes, yticklabels=classes)
        plt.title('Confusion Matrix')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.show()
Click to view advanced evaluation techniques
class AdvancedEvaluator:
    def __init__(self):
        self.basic_evaluator = ModelEvaluator()
        
    def cross_validate_model(self, model, X, y, cv=5):
        """Perform cross-validation"""
        cv_scores = cross_val_score(model, X, y, cv=cv)
        return {
            'mean_cv_score': cv_scores.mean(),
            'std_cv_score': cv_scores.std()
        }
    
    def learning_curve_analysis(self, model, X, y):
        """Plot learning curves"""
        train_sizes, train_scores, val_scores = learning_curve(
            model, X, y, cv=5, n_jobs=-1, 
            train_sizes=np.linspace(0.1, 1.0, 10))
        
        plt.figure(figsize=(10, 6))
        plt.plot(train_sizes, np.mean(train_scores, axis=1), label='Training score')
        plt.plot(train_sizes, np.mean(val_scores, axis=1), label='Cross-validation score')
        plt.title('Learning Curves')
        plt.xlabel('Training Examples')
        plt.ylabel('Score')
        plt.legend(loc='best')
        plt.grid(True)
        plt.show()

5. Model Deployment

Set up a simple API for text classification:

from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn

class TextInput(BaseModel):
    text: str

app = FastAPI()

@app.post("/classify/")
async def classify_text(input_data: TextInput):
    # Preprocess
    preprocessor = TextPreprocessor()
    clean_text = preprocessor.clean_text(input_data.text)
    
    # Get embeddings
    embedder = TextEmbedder()
    embedding = embedder.get_bert_embeddings([clean_text])
    
    # Make prediction
    prediction = model.predict(embedding)[0]
    
    return {"prediction": prediction}
Click to view advanced deployment setup
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import mlflow
import redis

class ProductionClassifier:
    def __init__(self):
        # Load models and preprocessing tools
        self.preprocessor = AdvancedTextPreprocessor()
        self.embedder = MultiEmbedder()
        
        # Initialize Redis for caching
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        
        # Load models from MLflow
        self.models = self.load_models()
        
    def load_models(self):
        """Load models from MLflow"""
        models = {}
        for model_name in ['bert', 'traditional']:
            model_path = f"models:/{model_name}/Production"
            models[model_name] = mlflow.pyfunc.load_model(model_path)
        return models
    
    async def get_cached_prediction(self, text_hash):
        """Check cache for previous predictions"""
        return self.redis_client.get(text_hash)
    
    async def cache_prediction(self, text_hash, prediction, expire_time=3600):
        """Cache prediction results"""
        self.redis_client.setex(text_hash, expire_time, prediction)

class ModelServer:
    def __init__(self):
        self.app = FastAPI()
        self.classifier = ProductionClassifier()
        
        # Add CORS middleware
        self.app.add_middleware(
            CORSMiddleware,
            allow_origins=["*"],
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"],
        )
        
        # Add routes
        self.setup_routes()
        
    def setup_routes(self):
        @self.app.post("/classify/")
        async def classify_text(input_data: TextInput):
            try:
                # Check cache
                text_hash = hash(input_data.text)
                cached_result = await self.classifier.get_cached_prediction(text_hash)
                
                if cached_result:
                    return {"prediction": cached_result, "source": "cache"}
                
                # Process new prediction
                result = await self.process_text(input_data.text)
                
                # Cache result
                await self.classifier.cache_prediction(text_hash, result)
                
                return {"prediction": result, "source": "model"}
                
            except Exception as e:
                raise HTTPException(status_code=500, detail=str(e))
        
        @self.app.get("/health/")
        async def health_check():
            return {"status": "healthy"}
        
    async def process_text(self, text):
        # Implement processing pipeline
        clean_text = self.classifier.preprocessor.clean_text(text)
        embedding = self.classifier.embedder.get_bert_embeddings([clean_text])
        prediction = self.classifier.models['bert'].predict(embedding)[0]
        return prediction

# Run server
if __name__ == "__main__":
    server = ModelServer()
    uvicorn.run(server.app, host="0.0.0.0", port=8000)

6. Performance Monitoring and Maintenance

Set up monitoring for the deployed model:

from datetime import datetime
import logging

class ModelMonitor:
    def __init__(self):
        self.setup_logging()
        
    def setup_logging(self):
        logging.basicConfig(
            filename=f'logs/model_monitoring_{datetime.now():%Y%m%d}.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        
    def log_prediction(self, text, prediction, confidence):
        """Log each prediction with metadata"""
        logging.info(
            f"Prediction made:\n"
            f"Text length: {len(text)}\n"
            f"Prediction: {prediction}\n"
            f"Confidence: {confidence:.4f}"
        )
Click to view advanced monitoring setup
import pandas as pd
from sklearn.metrics import accuracy_score
import plotly.graph_objects as go

class AdvancedModelMonitor:
    def __init__(self):
        self.basic_monitor = ModelMonitor()
        self.predictions_log = []
        self.performance_metrics = {}
        
    def log_prediction_with_features(self, text, prediction, actual=None, metadata=None):
        """Log detailed prediction information"""
        log_entry = {
            'timestamp': datetime.now(),
            'text_length': len(text),
            'prediction': prediction,
            'actual': actual,
            'metadata': metadata or {}
        }
        self.predictions_log.append(log_entry)
        
    def calculate_performance_metrics(self, window_size=1000):
        """Calculate rolling performance metrics"""
        if len(self.predictions_log) < window_size:
            return
            
        recent_predictions = pd.DataFrame(self.predictions_log[-window_size:])
        
        # Calculate accuracy if actual labels are available
        if 'actual' in recent_predictions.columns:
            accuracy = accuracy_score(
                recent_predictions['actual'],
                recent_predictions['prediction']
            )
            self.performance_metrics['accuracy'] = accuracy
        
        # Calculate prediction distribution
        pred_dist = recent_predictions['prediction'].value_counts(normalize=True)
        self.performance_metrics['prediction_distribution'] = pred_dist.to_dict()
        
    def plot_performance_trends(self):
        """Plot performance trends over time"""
        df = pd.DataFrame(self.predictions_log)
        
        # Create rolling accuracy plot
        fig = go.Figure()
        
        if 'actual' in df.columns:
            rolling_accuracy = df.rolling(window=100).apply(
                lambda x: accuracy_score(x['actual'], x['prediction'])
            )
            
            fig.add_trace(go.Scatter(
                x=df['timestamp'],
                y=rolling_accuracy,
                mode='lines',
                name='Rolling Accuracy'
            ))
            
        # Add prediction distribution
        pred_dist_over_time = df.groupby([
            pd.Grouper(key='timestamp', freq='D'),
            'prediction'
        ]).size().unstack(fill_value=0)
        
        for col in pred_dist_over_time.columns:
            fig.add_trace(go.Scatter(
                x=pred_dist_over_time.index,
                y=pred_dist_over_time[col],
                mode='lines',
                name=f'Class {col} predictions'
            ))
            
        fig.update_layout(
            title='Model Performance Trends',
            xaxis_title='Time',
            yaxis_title='Metric Value',
            hovermode='x unified'
        )
        
        fig.show()
        
    def generate_performance_report(self):
        """Generate comprehensive performance report"""
        report = {
            'timestamp': datetime.now(),
            'total_predictions': len(self.predictions_log),
            'recent_metrics': self.performance_metrics,
            'prediction_volume': len(self.predictions_log[-1000:]),
            'average_text_length': np.mean([
                log['text_length'] for log in self.predictions_log[-1000:]
            ])
        }
        
        return report

7. Best Practices and Optimization

  1. Model Optimization:

    • Use model quantization for faster inference
    • Implement batch prediction for better throughput
    • Cache frequent predictions
    • Consider using TensorRT for GPU acceleration
  2. Error Handling:

    • Implement robust error handling for malformed inputs
    • Add retry logic for failed predictions
    • Monitor and log edge cases
    • Set up alerts for critical failures
  3. Performance Tips:

    • Use appropriate batch sizes for embedding generation
    • Implement proper text cleaning pipeline
    • Consider using GPU acceleration for large-scale deployment
    • Optimize model loading and caching strategies
  4. Monitoring Best Practices:

    • Set up comprehensive logging
    • Monitor system resources (CPU, memory, GPU)
    • Track prediction latency
    • Implement data drift detection
    • Set up automated alerts for performance degradation
  5. Scaling Considerations:

    • Use load balancing for multiple model instances
    • Implement horizontal scaling for high traffic
    • Consider using Kubernetes for container orchestration
    • Set up auto-scaling based on traffic patterns

This deployment and monitoring setup provides:

  1. Production-ready API implementation
  2. Comprehensive monitoring system
  3. Performance optimization guidelines
  4. Scaling strategies
  5. Best practices for maintaining model performance