forked from MorphyKutay/PrestaShop-API
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_client.py
More file actions
149 lines (117 loc) · 5.22 KB
/
api_client.py
File metadata and controls
149 lines (117 loc) · 5.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""
PrestaShop API Client
PHP API ile iletişim kuran Python modülü
"""
import requests
from typing import Dict, List, Optional
import json
class PrestaShopAPIClient:
"""PrestaShop API Client sınıfı"""
def __init__(self, api_url: str, api_key: str):
"""
API Client'ı başlatır
Args:
api_url: API URL'i
api_key: API anahtarı
"""
self.api_url = api_url
self.api_key = api_key
self.headers = {
'X-API-Key': api_key,
'Content-Type': 'application/json'
}
def _make_request(self, method: str, resource: str, data: Optional[Dict] = None,
params: Optional[Dict] = None) -> Dict:
"""
API'ye HTTP isteği gönderir
Args:
method: HTTP metodu
resource: Resource adı (products, orders)
data: Gönderilecek veri
params: URL parametreleri
Returns:
API yanıtı
"""
url = f"{self.api_url}?resource={resource}"
# Parametreleri ekle
if params:
for key, value in params.items():
url += f"&{key}={value}"
try:
if method == 'GET':
response = requests.get(url, headers=self.headers, timeout=10)
elif method == 'POST':
response = requests.post(url, headers=self.headers,
json=data, timeout=10)
elif method == 'PUT':
response = requests.put(url, headers=self.headers,
json=data, timeout=10)
elif method == 'DELETE':
response = requests.delete(url, headers=self.headers, timeout=10)
else:
raise ValueError(f"Desteklenmeyen HTTP metodu: {method}")
# JSON yanıtı parse et
result = response.json()
if response.status_code >= 400:
raise Exception(result.get('message', 'API hatası'))
return result
except requests.exceptions.Timeout:
raise Exception("İstek zaman aşımına uğradı")
except requests.exceptions.ConnectionError:
raise Exception("API'ye bağlanılamadı. URL'i kontrol edin.")
except requests.exceptions.RequestException as e:
raise Exception(f"İstek hatası: {str(e)}")
except json.JSONDecodeError:
raise Exception("Geçersiz API yanıtı")
# ============= ÜRÜN İŞLEMLERİ =============
def get_products(self, page: int = 1, limit: int = 50,
search: str = None, active: int = None) -> Dict:
"""Ürün listesini getirir"""
params = {'page': page, 'limit': limit}
if search:
params['search'] = search
if active is not None:
params['active'] = active
return self._make_request('GET', 'products', params=params)
def get_product(self, product_id: int) -> Dict:
"""Tek bir ürünü getirir"""
return self._make_request('GET', 'products', params={'id': product_id})
def create_product(self, product_data: Dict) -> Dict:
"""Yeni ürün oluşturur"""
return self._make_request('POST', 'products', data=product_data)
def update_product(self, product_id: int, product_data: Dict) -> Dict:
"""Ürünü günceller"""
return self._make_request('PUT', 'products', data=product_data,
params={'id': product_id})
def delete_product(self, product_id: int) -> Dict:
"""Ürünü siler"""
return self._make_request('DELETE', 'products', params={'id': product_id})
# ============= SİPARİŞ İŞLEMLERİ =============
def get_orders(self, page: int = 1, limit: int = 50,
status: int = None, customer: int = None) -> Dict:
"""Sipariş listesini getirir"""
params = {'page': page, 'limit': limit}
if status is not None:
params['status'] = status
if customer is not None:
params['customer'] = customer
return self._make_request('GET', 'orders', params=params)
def get_order(self, order_id: int) -> Dict:
"""Tek bir siparişi getirir"""
return self._make_request('GET', 'orders', params={'id': order_id})
def update_order_status(self, order_id: int, new_status: int) -> Dict:
"""Sipariş durumunu günceller"""
return self._make_request('PUT', 'orders',
data={'current_state': new_status},
params={'id': order_id})
def delete_order(self, order_id: int) -> Dict:
"""Siparişi siler"""
return self._make_request('DELETE', 'orders', params={'id': order_id})
# ============= TEST =============
def test_connection(self) -> bool:
"""API bağlantısını test eder"""
try:
result = self._make_request('GET', 'products', params={'limit': 1})
return result.get('success', False)
except:
return False