Building an NLP Pipeline: Text Classification
Objective
Build a complete Natural Language Processing (NLP) pipeline for text classification, learning both traditional and modern deep learning approaches. You’ll convert text data into numerical representations using embeddings and implement classification models to categorize text effectively.
Learning Outcomes
By completing this project, you will:
- Master fundamental NLP concepts and text processing techniques
- Understand and implement different text embedding methods
- Build text classification models using both traditional ML and deep learning
- Evaluate and optimize NLP model performance
- Gain practical experience with industry-standard NLP tools and libraries
Skills Gained
- Building end-to-end NLP pipelines
- Implementing modern text embedding techniques
- Creating text classification systems
- Using state-of-the-art NLP libraries and models
- Evaluating and optimizing NLP models
- Handling real-world text data challenges
Tools Required
# Core libraries
pip install transformers
pip install torch torchvision
pip install scikit-learn
pip install nltk
pip install gensim
# Text processing
pip install spacy
pip install textblob
# Optional: for OpenAI embeddings
pip install openai
Project Structure
text_classification/
│
├── data/
│ ├── raw/
│ │ ├── train.csv
│ │ └── test.csv
│ └── processed/
│
├── src/
│ ├── preprocessing.py
│ ├── embeddings/
│ │ ├── traditional.py
│ │ ├── transformer.py
│ │ └── openai.py
│ ├── models.py
│ └── evaluation.py
│
└── notebooks/
├── 1_data_exploration.ipynb
├── 2_embeddings.ipynb
├── 3_model_training.ipynb
└── 4_evaluation.ipynb
Steps and Tasks
1. Data Preprocessing
First, let’s implement a robust text preprocessing pipeline:
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import re
class TextPreprocessor:
def __init__(self):
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
self.lemmatizer = WordNetLemmatizer()
self.stop_words = set(stopwords.words('english'))
def clean_text(self, text):
"""Basic text cleaning"""
# Convert to lowercase
text = text.lower()
# Remove special characters and digits
text = re.sub(r'[^a-zA-Z\s]', '', text)
# Tokenization
tokens = word_tokenize(text)
# Remove stopwords and lemmatize
tokens = [self.lemmatizer.lemmatize(token)
for token in tokens
if token not in self.stop_words]
return ' '.join(tokens)
Click to view advanced preprocessing
class AdvancedTextPreprocessor:
def __init__(self):
self.basic_preprocessor = TextPreprocessor()
self.nlp = spacy.load('en_core_web_sm')
def extract_entities(self, text):
"""Extract named entities"""
doc = self.nlp(text)
entities = [(ent.text, ent.label_) for ent in doc.ents]
return entities
def get_pos_tags(self, text):
"""Get POS tags"""
doc = self.nlp(text)
return [(token.text, token.pos_) for token in doc]
def clean_and_augment(self, text):
"""Clean text and add linguistic features"""
# Basic cleaning
clean_text = self.basic_preprocessor.clean_text(text)
# Get entities and POS tags
entities = self.extract_entities(text)
pos_tags = self.get_pos_tags(text)
return {
'clean_text': clean_text,
'entities': entities,
'pos_tags': pos_tags
}
2. Text Embeddings
Implement different embedding approaches:
from transformers import AutoTokenizer, AutoModel
import torch
class TextEmbedder:
def __init__(self, model_name='bert-base-uncased'):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name)
def get_bert_embeddings(self, texts):
"""Get BERT embeddings for texts"""
# Tokenize
encoded = self.tokenizer(
texts,
padding=True,
truncation=True,
max_length=512,
return_tensors='pt'
)
# Get embeddings
with torch.no_grad():
outputs = self.model(**encoded)
embeddings = outputs.last_hidden_state[:, 0, :]
return embeddings.numpy()
Click to view additional embedding methods
class MultiEmbedder:
def __init__(self):
self.bert_embedder = TextEmbedder()
def get_word2vec_embeddings(self, texts, vector_size=100):
"""Get Word2Vec embeddings"""
# Train Word2Vec model
tokenized_texts = [text.split() for text in texts]
w2v_model = Word2Vec(
sentences=tokenized_texts,
vector_size=vector_size,
window=5,
min_count=1
)
# Get sentence embeddings by averaging word vectors
embeddings = []
for text in tokenized_texts:
vectors = [w2v_model.wv[word] for word in text if word in w2v_model.wv]
if vectors:
embeddings.append(np.mean(vectors, axis=0))
else:
embeddings.append(np.zeros(vector_size))
return np.array(embeddings)
def get_openai_embeddings(self, texts):
"""Get OpenAI embeddings"""
import openai
embeddings = []
for text in texts:
response = openai.Embedding.create(
input=text,
model="text-embedding-ada-002"
)
embeddings.append(response['data'][0]['embedding'])
return np.array(embeddings)
3. Model Implementation
Create text classification models:
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
class TextClassifier:
def __init__(self, model_type='rf'):
if model_type == 'rf':
self.model = RandomForestClassifier(
n_estimators=100,
max_depth=None,
min_samples_split=2,
random_state=42
)
else:
self.model = LogisticRegression(
max_iter=1000,
random_state=42
)
def train(self, X_train, y_train):
"""Train the classifier"""
self.model.fit(X_train, y_train)
def predict(self, X):
"""Make predictions"""
return self.model.predict(X)
Click to view deep learning model implementation
class DeepTextClassifier(nn.Module):
def __init__(self, input_dim, hidden_dim, num_classes):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim // 2, num_classes)
)
def forward(self, x):
return self.layers(x)
class TextClassificationTrainer:
def __init__(self, model, device='cuda'):
self.model = model
self.device = device
self.model.to(device)
def train(self, train_loader, val_loader, epochs=10):
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(self.model.parameters())
for epoch in range(epochs):
self.model.train()
train_loss = 0
for batch in train_loader:
optimizer.zero_grad()
inputs, labels = batch
inputs = inputs.to(self.device)
labels = labels.to(self.device)
outputs = self.model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation
val_loss, val_acc = self.evaluate(val_loader)
print(f'Epoch {epoch+1}: Train Loss = {train_loss/len(train_loader):.4f}, '
f'Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}')
4. Evaluation
Implement comprehensive evaluation metrics:
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
class ModelEvaluator:
def __init__(self):
self.metrics = {}
def evaluate(self, y_true, y_pred):
"""Calculate classification metrics"""
self.metrics['accuracy'] = accuracy_score(y_true, y_pred)
self.metrics['report'] = classification_report(y_true, y_pred)
return self.metrics
def plot_confusion_matrix(self, y_true, y_pred, classes):
"""Plot confusion matrix"""
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=classes, yticklabels=classes)
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()
Click to view advanced evaluation techniques
class AdvancedEvaluator:
def __init__(self):
self.basic_evaluator = ModelEvaluator()
def cross_validate_model(self, model, X, y, cv=5):
"""Perform cross-validation"""
cv_scores = cross_val_score(model, X, y, cv=cv)
return {
'mean_cv_score': cv_scores.mean(),
'std_cv_score': cv_scores.std()
}
def learning_curve_analysis(self, model, X, y):
"""Plot learning curves"""
train_sizes, train_scores, val_scores = learning_curve(
model, X, y, cv=5, n_jobs=-1,
train_sizes=np.linspace(0.1, 1.0, 10))
plt.figure(figsize=(10, 6))
plt.plot(train_sizes, np.mean(train_scores, axis=1), label='Training score')
plt.plot(train_sizes, np.mean(val_scores, axis=1), label='Cross-validation score')
plt.title('Learning Curves')
plt.xlabel('Training Examples')
plt.ylabel('Score')
plt.legend(loc='best')
plt.grid(True)
plt.show()
5. Model Deployment
Set up a simple API for text classification:
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
class TextInput(BaseModel):
text: str
app = FastAPI()
@app.post("/classify/")
async def classify_text(input_data: TextInput):
# Preprocess
preprocessor = TextPreprocessor()
clean_text = preprocessor.clean_text(input_data.text)
# Get embeddings
embedder = TextEmbedder()
embedding = embedder.get_bert_embeddings([clean_text])
# Make prediction
prediction = model.predict(embedding)[0]
return {"prediction": prediction}
Click to view advanced deployment setup
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import mlflow
import redis
class ProductionClassifier:
def __init__(self):
# Load models and preprocessing tools
self.preprocessor = AdvancedTextPreprocessor()
self.embedder = MultiEmbedder()
# Initialize Redis for caching
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
# Load models from MLflow
self.models = self.load_models()
def load_models(self):
"""Load models from MLflow"""
models = {}
for model_name in ['bert', 'traditional']:
model_path = f"models:/{model_name}/Production"
models[model_name] = mlflow.pyfunc.load_model(model_path)
return models
async def get_cached_prediction(self, text_hash):
"""Check cache for previous predictions"""
return self.redis_client.get(text_hash)
async def cache_prediction(self, text_hash, prediction, expire_time=3600):
"""Cache prediction results"""
self.redis_client.setex(text_hash, expire_time, prediction)
class ModelServer:
def __init__(self):
self.app = FastAPI()
self.classifier = ProductionClassifier()
# Add CORS middleware
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add routes
self.setup_routes()
def setup_routes(self):
@self.app.post("/classify/")
async def classify_text(input_data: TextInput):
try:
# Check cache
text_hash = hash(input_data.text)
cached_result = await self.classifier.get_cached_prediction(text_hash)
if cached_result:
return {"prediction": cached_result, "source": "cache"}
# Process new prediction
result = await self.process_text(input_data.text)
# Cache result
await self.classifier.cache_prediction(text_hash, result)
return {"prediction": result, "source": "model"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/health/")
async def health_check():
return {"status": "healthy"}
async def process_text(self, text):
# Implement processing pipeline
clean_text = self.classifier.preprocessor.clean_text(text)
embedding = self.classifier.embedder.get_bert_embeddings([clean_text])
prediction = self.classifier.models['bert'].predict(embedding)[0]
return prediction
# Run server
if __name__ == "__main__":
server = ModelServer()
uvicorn.run(server.app, host="0.0.0.0", port=8000)
6. Performance Monitoring and Maintenance
Set up monitoring for the deployed model:
from datetime import datetime
import logging
class ModelMonitor:
def __init__(self):
self.setup_logging()
def setup_logging(self):
logging.basicConfig(
filename=f'logs/model_monitoring_{datetime.now():%Y%m%d}.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def log_prediction(self, text, prediction, confidence):
"""Log each prediction with metadata"""
logging.info(
f"Prediction made:\n"
f"Text length: {len(text)}\n"
f"Prediction: {prediction}\n"
f"Confidence: {confidence:.4f}"
)
Click to view advanced monitoring setup
import pandas as pd
from sklearn.metrics import accuracy_score
import plotly.graph_objects as go
class AdvancedModelMonitor:
def __init__(self):
self.basic_monitor = ModelMonitor()
self.predictions_log = []
self.performance_metrics = {}
def log_prediction_with_features(self, text, prediction, actual=None, metadata=None):
"""Log detailed prediction information"""
log_entry = {
'timestamp': datetime.now(),
'text_length': len(text),
'prediction': prediction,
'actual': actual,
'metadata': metadata or {}
}
self.predictions_log.append(log_entry)
def calculate_performance_metrics(self, window_size=1000):
"""Calculate rolling performance metrics"""
if len(self.predictions_log) < window_size:
return
recent_predictions = pd.DataFrame(self.predictions_log[-window_size:])
# Calculate accuracy if actual labels are available
if 'actual' in recent_predictions.columns:
accuracy = accuracy_score(
recent_predictions['actual'],
recent_predictions['prediction']
)
self.performance_metrics['accuracy'] = accuracy
# Calculate prediction distribution
pred_dist = recent_predictions['prediction'].value_counts(normalize=True)
self.performance_metrics['prediction_distribution'] = pred_dist.to_dict()
def plot_performance_trends(self):
"""Plot performance trends over time"""
df = pd.DataFrame(self.predictions_log)
# Create rolling accuracy plot
fig = go.Figure()
if 'actual' in df.columns:
rolling_accuracy = df.rolling(window=100).apply(
lambda x: accuracy_score(x['actual'], x['prediction'])
)
fig.add_trace(go.Scatter(
x=df['timestamp'],
y=rolling_accuracy,
mode='lines',
name='Rolling Accuracy'
))
# Add prediction distribution
pred_dist_over_time = df.groupby([
pd.Grouper(key='timestamp', freq='D'),
'prediction'
]).size().unstack(fill_value=0)
for col in pred_dist_over_time.columns:
fig.add_trace(go.Scatter(
x=pred_dist_over_time.index,
y=pred_dist_over_time[col],
mode='lines',
name=f'Class {col} predictions'
))
fig.update_layout(
title='Model Performance Trends',
xaxis_title='Time',
yaxis_title='Metric Value',
hovermode='x unified'
)
fig.show()
def generate_performance_report(self):
"""Generate comprehensive performance report"""
report = {
'timestamp': datetime.now(),
'total_predictions': len(self.predictions_log),
'recent_metrics': self.performance_metrics,
'prediction_volume': len(self.predictions_log[-1000:]),
'average_text_length': np.mean([
log['text_length'] for log in self.predictions_log[-1000:]
])
}
return report
7. Best Practices and Optimization
-
Model Optimization:
- Use model quantization for faster inference
- Implement batch prediction for better throughput
- Cache frequent predictions
- Consider using TensorRT for GPU acceleration
-
Error Handling:
- Implement robust error handling for malformed inputs
- Add retry logic for failed predictions
- Monitor and log edge cases
- Set up alerts for critical failures
-
Performance Tips:
- Use appropriate batch sizes for embedding generation
- Implement proper text cleaning pipeline
- Consider using GPU acceleration for large-scale deployment
- Optimize model loading and caching strategies
-
Monitoring Best Practices:
- Set up comprehensive logging
- Monitor system resources (CPU, memory, GPU)
- Track prediction latency
- Implement data drift detection
- Set up automated alerts for performance degradation
-
Scaling Considerations:
- Use load balancing for multiple model instances
- Implement horizontal scaling for high traffic
- Consider using Kubernetes for container orchestration
- Set up auto-scaling based on traffic patterns
This deployment and monitoring setup provides:
- Production-ready API implementation
- Comprehensive monitoring system
- Performance optimization guidelines
- Scaling strategies
- Best practices for maintaining model performance