Files
SAP-PLEX-SYNC/backend/src/handlers_sync.rs
b0rbor4d 5b447acd1c
Some checks failed
CI/CD Pipeline / Backend Tests (push) Failing after 27s
CI/CD Pipeline / Frontend Tests (push) Failing after 15s
CI/CD Pipeline / Docker Build (push) Has been skipped
CI/CD Pipeline / Security Scan (push) Has been skipped
Initial commit
2026-04-15 01:41:49 +02:00

369 lines
13 KiB
Rust
Executable File

use crate::state::AppState;
use crate::sync::*;
use axum::http::StatusCode;
use axum::{extract::State, response::IntoResponse, Json};
use postgres::Row;
use serde_json::json;
use std::sync::Arc;
use crate::errors::ApiError;
use tracing::{info, error};
pub async fn sync_status(
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
let request_id = uuid::Uuid::new_v4().to_string();
info!(request_id = %request_id, "Get sync status");
let mut conn = match state.pool.get() {
Ok(c) => c,
Err(e) => {
error!(request_id = %request_id, "Database connection error");
return Err(ApiError::Database(e.to_string()));
}
};
// Get stats
let stats_result = conn.query_one(
"SELECT
COUNT(*) FILTER (WHERE status = 'running'::sync_job_status) AS running,
COUNT(*) FILTER (WHERE status = 'completed'::sync_job_status AND created_at >= CURRENT_DATE) AS completed_today,
COUNT(*) FILTER (WHERE status = 'failed'::sync_job_status AND created_at >= CURRENT_DATE) AS failed_today
FROM sync_jobs",
&[],
);
let (running, completed_today, failed_today) = match stats_result {
Ok(row) => (
row.get::<_, i64>(0),
row.get::<_, i64>(1),
row.get::<_, i64>(2),
),
Err(e) => {
error!(request_id = %request_id, error = %e, "Stats query failed");
(0, 0, 0)
}
};
// Get recent jobs
let recent_result = match conn.query(
"SELECT id, job_type, sync_direction, status::text, records_processed, records_failed, created_at::text, started_at::text, completed_at::text
FROM sync_jobs ORDER BY created_at DESC LIMIT 5",
&[],
) {
Ok(r) => r,
Err(e) => {
error!(request_id = %request_id, "Database error");
return Err(ApiError::Database(e.to_string()));
}
};
let recent_jobs: Vec<_> = recent_result
.into_iter()
.map(|row| job_to_json(&row))
.collect();
let response = json!({
"is_running": running > 0,
"current_job": recent_jobs.iter().find(|job| job["status"] == "running").cloned(),
"recent_jobs": recent_jobs,
"stats": {
"running": running,
"completed_today": completed_today,
"failed_today": failed_today
}
});
Ok((StatusCode::OK, Json(response)).into_response())
}
pub async fn sync_jobs(
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
let request_id = uuid::Uuid::new_v4().to_string();
info!(request_id = %request_id, "Get sync jobs");
let mut conn = match state.pool.get() {
Ok(c) => c,
Err(e) => {
error!(request_id = %request_id, "Database connection error");
return Err(ApiError::Database(e.to_string()));
}
};
let result = conn.query(
"SELECT id, job_type, sync_direction, status::text, records_processed, records_failed, created_at::text, started_at::text, completed_at::text
FROM sync_jobs ORDER BY created_at DESC LIMIT 20",
&[],
);
match result {
Ok(rows) => {
let jobs: Vec<_> = rows.into_iter().map(|r| job_to_json(&r)).collect();
Ok((StatusCode::OK, Json(json!({ "jobs": jobs }))).into_response())
}
Err(e) => {
error!(request_id = %request_id, error = %e, "Database error");
Err(ApiError::Database(e.to_string()))
}
}
}
pub async fn start_sync(
State(state): State<Arc<AppState>>,
req: SyncStartRequest,
) -> impl IntoResponse {
let request_id = uuid::Uuid::new_v4().to_string();
info!(request_id = %request_id, job_type = %req.job_type, direction = %req.sync_direction, "Start sync");
let mut conn = match state.pool.get() {
Ok(c) => c,
Err(e) => {
error!(request_id = %request_id, "Database connection error");
return Err(ApiError::Database(e.to_string()));
}
};
let user_id = match &req.session_id {
Some(session_id) => {
match conn.query_opt(
"SELECT user_id FROM sessions WHERE id = $1 AND expires_at > CURRENT_TIMESTAMP",
&[&session_id],
) {
Ok(Some(row)) => row.get::<_, i32>(0),
Ok(None) => {
error!(request_id = %request_id, session_id = %session_id, "Session not found");
return Err(ApiError::Authentication("Session not found or expired".to_string()));
}
Err(e) => {
error!(request_id = %request_id, error = %e, "Session query error");
return Err(ApiError::Database(e.to_string()));
}
}
}
None => {
error!(request_id = %request_id, "No session ID provided");
return Err(ApiError::Authentication("No session ID provided".to_string()));
}
};
match conn.execute(
"INSERT INTO sync_jobs (job_type, sync_direction, status, created_by, created_at) VALUES ($1, $2, 'pending'::sync_job_status, $3, NOW())",
&[&req.job_type, &req.sync_direction, &user_id],
) {
Ok(_) => {
info!(request_id = %request_id, "Sync job created");
Ok((StatusCode::OK, Json(json!({
"message": "Sync job started",
"job_type": req.job_type,
"direction": req.sync_direction
}))).into_response())
}
Err(e) => {
error!(request_id = %request_id, error = %e, "Failed to create sync job");
Err(ApiError::Database(e.to_string()))
}
}
}
pub async fn stop_sync(
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
let request_id = uuid::Uuid::new_v4().to_string();
info!(request_id = %request_id, "Stop sync");
let mut conn = match state.pool.get() {
Ok(c) => c,
Err(e) => {
error!(request_id = %request_id, "Database connection error");
return Err(ApiError::Database(e.to_string()));
}
};
match conn.execute(
"UPDATE sync_jobs SET status = 'cancelled'::sync_job_status, completed_at = NOW() WHERE status IN ('running'::sync_job_status, 'pending'::sync_job_status)",
&[],
) {
Ok(_) => {
info!(request_id = %request_id, "Sync jobs stopped");
Ok((StatusCode::OK, Json(json!({ "message": "Sync jobs stopped" }))).into_response())
}
Err(e) => {
error!(request_id = %request_id, error = %e, "Failed to stop sync jobs");
Err(ApiError::Database(e.to_string()))
}
}
}
pub async fn simulate_sync(
State(state): State<Arc<AppState>>,
data: serde_json::Value,
) -> impl IntoResponse {
let request_id = uuid::Uuid::new_v4().to_string();
info!(request_id = %request_id, "Simulate sync");
let mut conn = match state.pool.get() {
Ok(c) => c,
Err(e) => {
error!(request_id = %request_id, "Database connection error");
return Err(ApiError::Database(e.to_string()));
}
};
let mut items: Vec<SyncItem> = Vec::new();
let data_type = data.get("data_type").and_then(|v| v.as_str()).unwrap_or("unknown");
// Fetch customers from database
if data_type == "customers" {
let rows = match conn.query(
"SELECT sap_card_code, plesk_customer_id, plesk_subscription_id FROM customers",
&[],
) {
Ok(r) => r,
Err(e) => {
error!(request_id = %request_id, error = %e, "Database error");
return Err(ApiError::Database(e.to_string()));
}
};
for (i, row) in rows.iter().enumerate() {
let sap_code: String = row.get(0);
let status = if i % 3 == 0 { "new" } else if i % 3 == 1 { "update" } else { "unchanged" };
items.push(SyncItem {
id: format!("sim-{}", i),
source_id: sap_code.clone(),
target_id: if status == "new" { None } else { Some(format!("PLESK-{}", 2000 + i)) },
name: format!("Customer {}", sap_code),
status: status.to_string(),
source_data: json!({"sap_card_code": sap_code}),
target_data: if status == "new" { None } else { Some(json!({"plesk_id": 2000 + i})) },
diff: None,
});
}
} else if data_type == "domains" {
// Simulate domain data
for i in 0..10 {
let status = if i % 3 == 0 { "new" } else if i % 3 == 1 { "update" } else { "unchanged" };
items.push(SyncItem {
id: format!("sim-domain-{}", i),
source_id: format!("SAP-DOM-{}", 1000 + i),
target_id: if status == "new" { None } else { Some(format!("PLESK-DOM-{}", i)) },
name: format!("domain{}.example.com", i),
status: status.to_string(),
source_data: json!({"domain_id": 1000 + i}),
target_data: if status == "new" { None } else { Some(json!({"plesk_domain_id": i})) },
diff: None,
});
}
}
let direction = data.get("direction").and_then(|v| v.as_str()).unwrap_or("sap_to_plesk");
let result = SimulationResult {
data_type: data_type.to_string(),
direction: direction.to_string(),
total_records: items.len(),
new: items.iter().filter(|item| item.status == "new").count(),
updated: items.iter().filter(|item| item.status == "update").count(),
conflicts: items.iter().filter(|item| item.status == "conflict").count(),
unchanged: items.iter().filter(|item| item.status == "unchanged").count(),
deleted: 0,
items,
};
Ok((StatusCode::OK, Json(json!(result))).into_response())
}
pub async fn get_conflicts(
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
let request_id = uuid::Uuid::new_v4().to_string();
info!(request_id = %request_id, "Get conflicts");
let mut conn = match state.pool.get() {
Ok(c) => c,
Err(e) => {
error!(request_id = %request_id, "Database connection error");
return Err(ApiError::Database(e.to_string()));
}
};
let result = conn.query(
"SELECT id, sync_job_id, entity_type, entity_id, resolution_status, source_data, target_data, conflict_details FROM sync_conflicts ORDER BY created_at DESC LIMIT 20",
&[],
);
match result {
Ok(rows) => {
let conflicts: Vec<Conflict> = rows
.into_iter()
.map(|row| Conflict {
id: row.get::<_, i32>(0),
sync_job_id: row.get::<_, i32>(1),
entity_type: row.get::<_, String>(2),
entity_id: row.get::<_, String>(3),
resolution_status: row.get::<_, String>(4),
source_data: row.get::<_, Option<String>>(5).unwrap_or_default(),
target_data: row.get::<_, Option<String>>(6),
conflict_details: row.get::<_, Option<String>>(7),
})
.collect();
Ok((StatusCode::OK, Json(json!({ "conflicts": conflicts }))).into_response())
}
Err(e) => {
error!(request_id = %request_id, error = %e, "Database error");
Err(ApiError::Database(e.to_string()))
}
}
}
pub async fn resolve_conflict(
State(state): State<Arc<AppState>>,
req: ConflictResolution,
) -> impl IntoResponse {
let request_id = uuid::Uuid::new_v4().to_string();
info!(request_id = %request_id, "Resolve conflict");
let mut conn = match state.pool.get() {
Ok(c) => c,
Err(e) => {
error!(request_id = %request_id, "Database connection error");
return Err(ApiError::Database(e.to_string()));
}
};
match conn.execute(
"UPDATE sync_conflicts SET resolution_status = $1, resolved_data = $2::jsonb WHERE id = $3",
&[&req.action, &req.resolved_data.to_string(), &req.id],
) {
Ok(_) => {
info!(request_id = %request_id, "Conflict resolved");
Ok((StatusCode::OK, Json(json!({ "message": "Conflict resolved" }))).into_response())
}
Err(e) => {
error!(request_id = %request_id, error = %e, "Failed to resolve conflict");
Err(ApiError::Database(e.to_string()))
}
}
}
fn job_to_json(row: &Row) -> serde_json::Value {
json!({
"id": row.get::<_, i32>(0),
"job_type": row.get::<_, String>(1),
"sync_direction": row.get::<_, String>(2),
"status": row.get::<_, String>(3),
"records_processed": row.get::<_, i32>(4),
"records_failed": row.get::<_, i32>(5),
"created_at": row.get::<_, String>(6),
"started_at": row.get::<_, Option<String>>(7),
"completed_at": row.get::<_, Option<String>>(8),
})
}