#!
/usr/bin/env python3
# coding: utf-8
"""
bighub_complete.py
Fluxo completo: signup -> purchase -> HubUp poll -> Upgate init -> payment-info ->
submit card -> final poll
Agora lê cartões de [Link] (formato: número|mes|ano|cvv) e processa cada um
separadamente.
Inclui relançamento automático em diretório temporário para execução limpa.
Logs em debug_all.txt, [Link] e resultados individuais por cartão.
"""
import requests
import json
import time
import random
import hashlib
import re
import os
import sys
import shutil
import tempfile
import subprocess
from pathlib import Path
from [Link] import urlparse, parse_qs
BASE = "[Link]
HUB = "[Link]
UPGATE = "[Link]
UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like
Gecko) Chrome/[Link] Safari/537.36"
# ---------- util ----------
def mask_card(number):
if not number:
return ""
s = [Link](r'\D', '', str(number))
if len(s) <= 4:
return "****" + s
return "**** **** **** " + s[-4:]
def mask_cvv(cvv):
if not cvv:
return ""
return "***"
def now_ts():
return [Link]("%Y-%m-%dT%H:%M:%SZ", [Link]())
def parse_card_line(line):
"""Parse uma linha do arquivo [Link] (formato: número|mes|ano|cvv)"""
line = [Link]()
if not line or [Link]('#'):
return None
parts = [Link]('|')
if len(parts) != 4:
# Tentar outros separadores
for sep in [',', ';', ' ', '\t']:
if sep in line:
parts = [Link](sep)
if len(parts) == 4:
break
if len(parts) != 4:
print(f"Formato inválido na linha: {line}")
return None
card_number = parts[0].strip()
month = parts[1].strip().zfill(2)
year = parts[2].strip()
cvv = parts[3].strip()
# Ajustar ano se for 2 dígitos
if len(year) == 2:
year = "20" + year
# Criar expirationDate no formato MM/AA
expiration_date = f"{month}/{year[-2:]}"
return {
"cardNumber": card_number,
"expirationDate": expiration_date,
"cvv": cvv,
"cardHolder": "TESTE CARTAO" # Pode ser personalizado se necessário
}
def load_cards_from_file(filename="[Link]"):
"""Carrega cartões do arquivo [Link]"""
cards = []
if not [Link](filename):
print(f"Arquivo {filename} não encontrado!")
print("Crie um arquivo [Link] com um cartão por linha no formato:")
print("numero|mes|ano|cvv")
print("Exemplo: 5557241110076722|04|28|111")
return []
try:
with open(filename, 'r', encoding='utf-8') as f:
for i, line in enumerate(f, 1):
card_data = parse_card_line(line)
if card_data:
[Link](card_data)
print(f"Cartão {i}: {mask_card(card_data['cardNumber'])}
{card_data['expirationDate']}")
except Exception as e:
print(f"Erro ao ler arquivo {filename}: {e}")
return cards
# ---------- logger ----------
class Logger:
def __init__(self, path="debug_all.txt", mask_sensitive=True):
[Link] = path
self.mask_sensitive = mask_sensitive
with open([Link], "w", encoding="utf-8") as f:
[Link]("== Debug consolidado ==\n")
[Link](f"Started: {now_ts()}\n")
def _safe_dump(self, obj):
try:
return [Link](obj, ensure_ascii=False, indent=2)
except Exception:
return str(obj)
def write(self, title, content):
try:
with open([Link], "a", encoding="utf-8") as f:
[Link](f"\n--- {title} ---\n")
if hasattr(content, "status_code"):
[Link](f"STATUS: {content.status_code}\n")
try:
j = [Link]()
if self.mask_sensitive:
j = self._mask_response(j)
[Link](self._safe_dump(j))
except Exception:
text = [Link] if hasattr(content, "text") else
str(content)
[Link](self._mask_text(text))
else:
if isinstance(content, dict):
if self.mask_sensitive:
content = self._mask_response(content)
[Link](self._safe_dump(content))
else:
[Link](self._mask_text(str(content)))
[Link]("\n")
except Exception:
pass
def _mask_text(self, text):
text = [Link](r'(\d{4})\s?(\d{4})\s?(\d{4})\s?(\d{4})', '**** **** **** \\
4', text)
text = [Link](r'(\d{12,19})', lambda m: '**** **** **** ' + [Link](1)[-
4:], text)
text = [Link](r'cvv["\']?\s*[:=]\s*["\']?\d{3,4}["\']?', 'cvv: ***', text,
flags=[Link])
return text
def _mask_response(self, obj):
if isinstance(obj, dict):
out = {}
for k, v in [Link]():
key = [Link]()
if key in ("cardnumber", "card_number", "card"):
out[k] = mask_card(v)
elif key in ("cvv", "cvc", "securitycode"):
out[k] = mask_cvv(v)
elif key in ("cardholder", "cardholdername"):
out[k] = v
elif isinstance(v, (dict, list)):
out[k] = self._mask_response(v)
else:
out[k] = v
return out
if isinstance(obj, list):
return [self._mask_response(x) for x in obj]
return obj
# ---------- payment gateway helper ----------
class PaymentGatewayHelper:
def __init__(self, session, logger):
self.s = session
[Link] = logger
def _extract_token_from_url(self, url, param="token"):
try:
qs = parse_qs(urlparse(url).query)
return ([Link](param) or [None])[0]
except Exception:
return None
def poll_hub_for_form_url(self, initial_url, max_attempts=20, delay=1.5):
hub_token = self._extract_token_from_url(initial_url)
if not hub_token:
return False, "hub token ausente"
headers = {"User-Agent": UA, "Accept": "application/json", "Authorization":
f"Bearer {hub_token}"}
poll_url = f"{HUB}/api/redirect-service/v2/session/state"
for i in range(max_attempts):
r = [Link](poll_url, headers=headers, timeout=15)
[Link](f"hub_poll_attempt_{i+1}", r)
if r.status_code == 200:
data = [Link]()
action = ([Link]("action") or "").upper()
if action == "REDIRECT":
form_url = [Link]("url") or [Link]("formUrl")
if form_url:
return True, {"formUrl": form_url, "hubToken": hub_token}
elif action in ("SUCCESS", "FAILURE", "ERROR"):
return True, {"terminal": data, "hubToken": hub_token}
[Link](delay)
return False, "timeout polling hub"
def init_upgate_session(self, form_url):
r = [Link](form_url, headers={"User-Agent": UA, "Accept":
"text/html,*/*"}, timeout=15)
[Link]("upgate_form_page", r)
upgate_token = self._extract_token_from_url(form_url)
if not upgate_token:
try:
r_info = [Link](f"{UPGATE}/api/payment-form/v1/payment-info",
headers={"User-Agent": UA, "Accept": "application/json"}, timeout=12)
[Link]("upgate_payment_info_probe", r_info)
if r_info.status_code == 200:
info = r_info.json()
upgate_token = [Link]("token") or [Link]("upgateToken") or
upgate_token
except Exception as e:
[Link]("upgate_payment_info_probe_error", str(e))
if not upgate_token:
return False, "upgate token ausente"
return True, {"upgateToken": upgate_token, "formUrl": form_url}
def get_payment_info(self, upgate_token, form_url):
headers = {
"User-Agent": UA,
"Accept": "application/json",
"Authorization": f"Bearer {upgate_token}",
"Referer": form_url,
"Origin": UPGATE
}
r = [Link](f"{UPGATE}/api/payment-form/v1/payment-info",
headers=headers, timeout=15)
[Link]("payment_info", r)
if r.status_code != 200:
return False, {"status": r.status_code, "text": [Link]}
return True, [Link]()
def submit_card(self, upgate_token, form_url, card_data, prefill_token=None,
max_retries=1):
headers = {
"User-Agent": UA,
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {upgate_token}",
"Referer": form_url,
"Origin": UPGATE
}
def _build_payload(prefill_token):
browser_info = {
"javaEnabled": False,
"javascriptEnabled": True,
"language": "pt-BR",
"colorDepth": 24,
"screenHeight": 720,
"screenWidth": 1280,
"tzOffsetMin": 180,
"challengeWindowSize": "04"
}
if prefill_token and card_data.get("cvv"):
return {"paymentTokenId": prefill_token, "cvv":
card_data.get("cvv"), "browserInfo": browser_info}
return {
"browserInfo": browser_info,
"cardHolder": card_data.get("cardHolder"),
"cardNumber": card_data.get("cardNumber"),
"cvv": card_data.get("cvv"),
"expirationDate": card_data.get("expirationDate")
}
attempt = 0
while attempt <= max_retries:
payload = _build_payload(prefill_token)
[Link]("submit_card_payload", payload)
r = [Link](f"{UPGATE}/api/payment-form/v1/payment-form",
headers=headers, json=payload, timeout=30)
[Link](f"submit_card_response_attempt_{attempt+1}", r)
if r.status_code == 200:
try:
return True, [Link]()
except Exception:
return True, {"text": [Link]}
if r.status_code == 403 and attempt < max_retries:
attempt += 1
[Link]("submit_card_403_retry", {"attempt": attempt})
try:
r_init = [Link](form_url, headers={"User-Agent": UA},
timeout=12)
[Link]("submit_card_refresh_session", r_init)
except Exception as e:
[Link]("submit_card_refresh_error", str(e))
[Link](1.0)
continue
try:
return False, [Link]()
except Exception:
return False, {"status": r.status_code, "text": [Link]}
return False, "submit_card failed after retries"
def poll_hub_final(self, hub_token, max_attempts=30, delay=2):
headers = {"User-Agent": UA, "Accept": "application/json", "Authorization":
f"Bearer {hub_token}"}
poll_url = f"{HUB}/api/redirect-service/v2/session/state"
for i in range(max_attempts):
r = [Link](poll_url, headers=headers, timeout=15)
[Link](f"hub_final_poll_{i+1}", r)
if r.status_code == 200:
data = [Link]()
action = ([Link]("action") or "").upper()
if action in ("SUCCESS", "FAILURE", "ERROR"):
return True, data
[Link](delay)
return False, "timeout final polling"
# ---------- wait for card response (timeout 15s) ----------
def wait_for_card_response(gateway_helper, upgate_token, form_url, timeout=15,
interval=1.0):
"""
Poll payment-info until an errorMessage or clear card-related result appears.
Returns tuple (found: bool, info_or_error: dict or str).
"""
start = [Link]()
while True:
elapsed = [Link]() - start
if elapsed > timeout:
return False, "timeout waiting for card response"
ok, info = gateway_helper.get_payment_info(upgate_token, form_url)
if not ok:
gateway_helper.[Link]("wait_payment_info_error", info)
else:
err = [Link]("errorMessage")
if err:
return True, {"errorMessage": err, "info": info}
for key in ("action", "status", "paymentStatus", "result"):
val = [Link](key)
if isinstance(val, str) and [Link]():
return True, {"field": key, "value": val, "info": info}
last_card = [Link]("lastUsedCard") or {}
if isinstance(last_card, dict) and last_card.get("validationMessage"):
return True, {"validationMessage":
last_card.get("validationMessage"), "info": info}
[Link](interval)
# ---------- purchase flow ----------
class PurchaseFlow:
def __init__(self, card_index=1, total_cards=1):
self.s = [Link]()
[Link]({
"User-Agent": UA,
"Accept": "application/json, text/plain, */*",
"Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8",
"Front-Version": "11.4.45",
"Origin": BASE,
"Referer": f"{BASE}/"
})
[Link] = None
[Link] = None
self.csrf_ts = None
self.user_id = None
[Link] = None
self.card_index = card_index
self.total_cards = total_cards
def init_logger(self):
"""Inicializa o logger para este cartão específico"""
timestamp = [Link]("%Y%m%d_%H%M%S")
[Link] = Logger(f"debug_all.txt", mask_sensitive=True)
[Link] = PaymentGatewayHelper(self.s, [Link])
def _rnd_user(self):
words = ['sol','lua','mar','ceu','rio','voz','paz','rede','vida','arte']
username = ''.join([Link](words, [Link](3,5))).lower()[:24]
email = f"{username}@[Link]"
password = f"Senha{[Link](1000,9999)}!"
year = [Link](1965,2002)
month = [Link](1,12)
day = [Link](1,28)
birth = f"{year:04d}-{month:02d}-{day:02d}"
return username, email, password, birth
def _update_csrf(self, request_path="/signup/user"):
params = {"requestPath": request_path, "disableCommon": "1"}
try:
r = [Link](f"{BASE}/api/front/v3/config/initial", params=params,
timeout=10)
[Link]("initial_config " + request_path, r)
if r.status_code == 200:
cli = ([Link]().get("initial", {}).get("client", {}) or {})
[Link] = [Link]("csrfToken")
self.csrf_ts = [Link]("csrfTimestamp")
return True
return False
except Exception as e:
[Link]("initial_config error", str(e))
return False
def create_user(self):
if not self._update_csrf("/signup/user"):
return False, "CSRF inicial indisponível"
username, email, password, birth = self._rnd_user()
payload = {
"login": username,
"email": email,
"password": password,
"isModel": False,
"isStudio": False,
"fingerprint": hashlib.md5(str([Link]()).encode()).hexdigest(),
"csrfToken": [Link],
"csrfTimestamp": self.csrf_ts,
"gender": "male",
"dateOfBirth": birth,
"country": "BR",
"acceptTerms": True,
"acceptAge": True,
"subscription": False
}
r = [Link](f"{BASE}/api/front/v4/users", json=payload, timeout=20)
[Link]("create_user POST", r)
if r.status_code == 200:
d = [Link]()
self.user_id = [Link]("user", {}).get("id")
return True, {"username": username, "email": email, "password":
password, "birth": birth, "user_id": self.user_id}
return False, f"Falha HTTP {r.status_code}"
def get_purchase_config(self, promoCampaign=""):
self._update_csrf("/purchase")
attempts_params = [
{"promoCampaign": promoCampaign, "features":
"abPIXStarterb,abStarterE", "uniq": str(int([Link]()*1000))},
{"promoCampaign": "", "features": "abPIXStarterb,abStarterE", "uniq":
str(int([Link]()*1000))},
{"promoCampaign": "", "features": "", "uniq":
str(int([Link]()*1000))}
]
for params in attempts_params:
backoff = 0.5
for i in range(12):
r = [Link](f"{BASE}/api/front/purchase/digest/config",
params=params, timeout=12)
[Link](f"purchase_digest_config {params} attempt {i+1}", r)
if r.status_code == 200:
j = [Link]()
methods = [Link]("options", {}).get("methods") or {}
if methods:
return True, j
[Link](backoff)
backoff = min(backoff*1.6, 3.0)
return False, "Config indisponível após polling"
def select_valid_package(self, config_json, prefer_method="card",
prefer_tokens=45, prefer_currency="USD"):
methods = config_json.get("options", {}).get("methods") or {}
if not methods:
return None
chosen_method = None
for v in [Link]():
if ([Link]("name") or "").lower() == prefer_method:
chosen_method = v
break
if not chosen_method:
for v in [Link]():
if any(not [Link]("isHidden", False) for p in ([Link]("packages") or
{}).values()):
chosen_method = v
break
if not chosen_method:
return None
candidates = []
for key, pkg in (chosen_method.get("packages") or {}).items():
if [Link]("isHidden", False) or chosen_method.get("isDisabled",
False):
continue
billings = [Link]("billings") or {}
billing = [Link]("10") or (next(iter([Link]())) if
billings else None)
if not billing or not [Link]("purchase"):
continue
req = billing["purchase"].get("request") or {}
digest = billing["purchase"].get("digest")
[Link]({
"_key": key,
"methodName": [Link]("methodName"),
"currencyCode": [Link]("currencyCode"),
"currencySum": [Link]("currencySum"),
"tokensSum": [Link]("tokensSum"),
"request": req,
"digest": digest,
"isPromo": [Link]("isPromo"),
"promoCampaign": [Link]("promoCampaign"),
})
if not candidates:
return None
for c in candidates:
t = [Link]("tokensSum")
cur = [Link]("currencyCode")
if t and prefer_tokens and int(t) == int(prefer_tokens) and (not
prefer_currency or cur == prefer_currency):
return c
for c in candidates:
if [Link]("isPromo"):
return c
return min(candidates, key=lambda x: float([Link]("currencySum") or 1e9))
def post_purchase(self, package_obj):
if not package_obj:
return False, "Nenhum pacote selecionado"
if not self._update_csrf("/purchase"):
return False, "CSRF não atualizado"
req = dict(package_obj.get("request") or {})
digest = package_obj.get("digest")
if self.user_id:
req["user"] = f"scId.{self.user_id}"
form = {"csrfToken": [Link] or "", "csrfTimestamp": self.csrf_ts or ""}
for k, v in [Link]():
form[f"purchase[request][{k}]"] = "" if v is None else str(v)
form["purchase[digest]"] = digest
headers = {
"User-Agent": UA,
"Referer": f"{BASE}/",
"Origin": BASE,
"Front-Version": [Link]("Front-Version", "11.4.45"),
"Content-Type": "application/x-www-form-urlencoded",
}
r = [Link](f"{BASE}/api/front/purchase/digest/purchase", data=form,
headers=headers, timeout=15, allow_redirects=False)
[Link]("purchase_post", r)
if r.status_code in (302, 303):
return True, {"location": [Link]("Location"), "status":
r.status_code}
if r.status_code == 200:
try:
return True, [Link]()
except Exception:
return True, {"text": [Link]}
try:
return False, [Link]()
except Exception:
return False, {"status": r.status_code, "text": [Link]}
def run_for_card(self, card_data, prefer_method="card", prefer_tokens=45,
prefer_currency="USD", promoCampaign=""):
"""Executa o fluxo completo para um cartão específico"""
print(f"\n{'='*60}")
print(f"PROCESSANDO CARTÃO {self.card_index}/{self.total_cards}")
print(f"Cartão: {mask_card(card_data.get('cardNumber'))}")
print(f"Validade: {card_data.get('expirationDate')}")
print(f"{'='*60}\n")
# Inicializar logger para este cartão
self.init_logger()
# Registrar cartão (mascarado)
masked_card = card_data.copy()
masked_card["cardNumber"] = mask_card(card_data["cardNumber"])
masked_card["cvv"] = "***"
[Link](f"cartao_{self.card_index}_inicio", {
"cartao_mascarado": masked_card,
"timestamp": now_ts()
})
# 1. Criar usuário
print(f"[{self.card_index}/{self.total_cards}] 1. Criando usuário...")
ok, user = self.create_user()
if not ok:
print(f" ✗ Falha: {user}")
return False, {"stage": "create_user", "error": user, "card":
masked_card}
print(f" ✓ Usuário criado: {[Link]('username')}")
# 2. Obter configuração de compra
print(f"[{self.card_index}/{self.total_cards}] 2. Obtendo configuração de
compra...")
ok, cfg = self.get_purchase_config(promoCampaign=promoCampaign)
if not ok:
print(f" ✗ Falha: {cfg}")
return False, {"stage": "digest_config", "error": cfg, "card":
masked_card}
print(" ✓ Configuração obtida")
# 3. Selecionar pacote
pkg = self.select_valid_package(cfg, prefer_method=prefer_method,
prefer_tokens=prefer_tokens, prefer_currency=prefer_currency)
if not pkg:
print(" ✗ Nenhum pacote disponível")
return False, {"stage": "select_package", "error": "Nenhum pacote
visível/compatível", "card": masked_card}
print(f" ✓ Pacote selecionado: {[Link]('tokensSum')} tokens por
{[Link]('currencySum')} {[Link]('currencyCode')}")
[Link]("selected_package", {
"key": [Link]("_key"),
"methodName": [Link]("methodName"),
"currencyCode": [Link]("currencyCode"),
"currencySum": [Link]("currencySum"),
"tokensSum": [Link]("tokensSum"),
"isPromo": [Link]("isPromo"),
"promoCampaign": [Link]("promoCampaign"),
})
# 4. Postar compra
print(f"[{self.card_index}/{self.total_cards}] 3. Processando compra...")
ok, gateway_response = self.post_purchase(pkg)
if not ok:
resp = gateway_response if isinstance(gateway_response, dict) else
{"response": str(gateway_response)}
if isinstance(resp, dict) and [Link]("response", {}).get("errorCode")
== "packageUnavailable":
alt_pkg = self.select_valid_package(cfg,
prefer_method=[Link]("methodName") or prefer_method, prefer_tokens=None,
prefer_currency=[Link]("currencyCode"))
if alt_pkg and alt_pkg.get("_key") != [Link]("_key"):
[Link]("fallback_package_attempt", {"try_key":
alt_pkg.get("_key")})
ok2, gateway_response2 = self.post_purchase(alt_pkg)
if ok2:
gateway_response = gateway_response2
else:
return False, {"stage": "purchase_post", "error":
gateway_response, "fallback_error": gateway_response2, "card": masked_card}
else:
return False, {"stage": "purchase_post", "error":
gateway_response, "card": masked_card}
else:
return False, {"stage": "purchase_post", "error": gateway_response,
"card": masked_card}
# 5. Poll Hub para URL do formulário
initial_url = gateway_response.get("location") or
gateway_response.get("url")
print(f"[{self.card_index}/{self.total_cards}] 4. Aguardando gateway...")
ok, hub_poll = [Link].poll_hub_for_form_url(initial_url)
if not ok:
print(f" ✗ Falha: {hub_poll}")
return False, {"stage": "payment_bootstrap", "error": hub_poll, "card":
masked_card}
form_url = hub_poll.get("formUrl")
hub_token = hub_poll.get("hubToken")
print(" ✓ Gateway pronto")
# 6. Inicializar sessão Upgate
print(f"[{self.card_index}/{self.total_cards}] 5. Inicializando sessão de
pagamento...")
ok, upgate_init = [Link].init_upgate_session(form_url)
if not ok:
print(f" ✗ Falha: {upgate_init}")
return False, {"stage": "upgate_init", "error": upgate_init, "card":
masked_card}
upgate_token = upgate_init.get("upgateToken")
print(" ✓ Sessão iniciada")
# 7. Obter informações de pagamento
ok_info, info = [Link].get_payment_info(upgate_token, form_url)
prefill_token = None
if ok_info:
[Link]("payment_info_initial", info)
if [Link]("errorMessage"):
print(f" ✗ Erro no gateway: {[Link]('errorMessage')}")
return False, {"stage": "payment_info_init", "error":
[Link]("errorMessage"), "info": info, "card": masked_card}
last_card = [Link]("lastUsedCard") or ([Link]("prefillCards") or
[None])[0]
if isinstance(last_card, dict):
prefill_token = last_card.get("paymentTokenId") or
last_card.get("paymentToken")
else:
[Link]("payment_info_init_error", info)
prefill_token = None
# 8. Validar e processar cartão
print(f"[{self.card_index}/{self.total_cards}] 6. Processando cartão...")
if prefill_token:
# Forçar fluxo tokenizado: limpar número e expiry para evitar confusão
card_data["cardNumber"] = card_data.get("cardNumber", "") or ""
card_data["expirationDate"] = card_data.get("expirationDate", "") or ""
if not card_data.get("cvv"):
try:
card_data["cvv"] = input("Cartão pré-salvo detectado. Digite o
CVV para usar com o token: ").strip()
except Exception:
pass
# Validação básica do cartão
if card_data.get("cardNumber") and not [Link](r'^\d{12,19}$', [Link](r'\
D','', card_data.get("cardNumber"))):
[Link]("card_validation", "card number format seems invalid")
if card_data.get("cvv") and not [Link](r'^\d{3,4}$',
card_data.get("cvv")):
[Link]("card_validation", "cvv format seems invalid")
# 9. Submeter cartão
ok_submit, submit_resp = [Link].submit_card(upgate_token, form_url,
card_data, prefill_token=prefill_token, max_retries=1)
if not ok_submit:
[Link]("card_submission_failed", submit_resp)
try:
ok_info2, info2 = [Link].get_payment_info(upgate_token,
form_url)
if ok_info2 and [Link]("errorMessage"):
print(f" ✗ Erro no cartão: {[Link]('errorMessage')}")
return False, {"stage": "card_submission", "error":
[Link]("errorMessage"), "info": info2, "card": masked_card}
except Exception:
pass
print(f" ✗ Falha no envio do cartão: {submit_resp}")
return False, {"stage": "card_submission", "error": submit_resp,
"card": masked_card}
[Link]("card_submission_success", submit_resp)
print(" ✓ Cartão processado")
# 10. Aguardar resposta do cartão
print(f"[{self.card_index}/{self.total_cards}] 7. Aguardando resposta...")
found, card_result = wait_for_card_response([Link], upgate_token,
form_url, timeout=15, interval=1.0)
if found:
if isinstance(card_result, dict) and card_result.get("errorMessage"):
result_msg = card_result.get("errorMessage")
print(f" ⚠ Resposta: {result_msg}")
elif isinstance(card_result, dict) and card_result.get("field"):
result_msg = f"{card_result.get('field')} =
{card_result.get('value')}"
print(f" ⚠ Resposta: {result_msg}")
else:
result_msg = str(card_result)
print(f" ⚠ Resposta: {result_msg}")
[Link]("card_result_wait", card_result)
if isinstance(card_result, dict) and card_result.get("errorMessage"):
return False, {"stage": "payment_final", "error":
card_result.get("errorMessage"), "info": card_result.get("info"), "card":
masked_card}
else:
result_msg = "timeout (nenhuma mensagem encontrada)"
print(f" ⚠ {result_msg}")
[Link]("card_result_wait_timeout", card_result)
# 11. Verificar informações após submissão
ok_info_after, info_after = [Link].get_payment_info(upgate_token,
form_url)
if ok_info_after:
[Link]("payment_info_after_submit", info_after)
if info_after.get("errorMessage"):
print(f" ✗ Erro final: {info_after.get('errorMessage')}")
return False, {"stage": "payment_final", "error":
info_after.get("errorMessage"), "info": info_after, "card": masked_card}
else:
[Link]("payment_info_after_submit_error", info_after)
# 12. Poll final do Hub
print(f"[{self.card_index}/{self.total_cards}] 8. Finalizando
transação...")
ok_final, final = [Link].poll_hub_final(hub_token)
if not ok_final:
print(f" ✗ Timeout no poll final: {final}")
return False, {"stage": "payment_polling", "error": final, "card":
masked_card}
action = [Link]("action", "").upper()
if action == "SUCCESS":
print(f" ✓ SUCESSO! Transação aprovada")
elif action == "FAILURE":
print(f" ✗ FALHA! Transação recusada")
else:
print(f" ⚠ Resultado: {action}")
return True, {
"user": user,
"gateway": {"formUrl": form_url},
"payment_result": final,
"card": masked_card,
"result_message": result_msg
}
# ---------- main ----------
def main():
# Carregar cartões do arquivo
cards = load_cards_from_file("[Link]")
if not cards:
print("Nenhum cartão válido encontrado. Encerrando.")
return
print(f"\n{len(cards)} cartão(s) carregado(s) para teste.")
print("Iniciando processamento...\n")
resultados = []
# Processar cada cartão
for i, card_data in enumerate(cards, 1):
# Criar nova sessão para cada cartão
flow = PurchaseFlow(card_index=i, total_cards=len(cards))
try:
ok, result = flow.run_for_card(
card_data=card_data,
prefer_method="card",
prefer_tokens=45,
prefer_currency="USD",
promoCampaign=""
)
# Adicionar resultado
result_item = {
"cartao_numero": mask_card(card_data["cardNumber"]),
"cartao_validade": card_data["expirationDate"],
"indice": i,
"sucesso": ok,
"resultado": result
}
[Link](result_item)
# Aguardar um pouco entre os cartões
if i < len(cards):
print(f"\nAguardando 2 segundos antes do próximo cartão...")
[Link](2)
except Exception as e:
print(f"\n✗ ERRO INESPERADO no cartão {i}: {e}")
[Link]({
"cartao_numero": mask_card(card_data["cardNumber"]),
"cartao_validade": card_data["expirationDate"],
"indice": i,
"sucesso": False,
"erro": str(e)
})
# Continuar com o próximo cartão mesmo se um falhar
# Salvar resultados consolidados
with open("[Link]", "w", encoding="utf-8") as f:
[Link]({
"timestamp": now_ts(),
"total_cartoes": len(cards),
"resultados": resultados
}, f, ensure_ascii=False, indent=2)
# Exibir resumo
print(f"\n{'='*60}")
print("RESUMO DO PROCESSAMENTO")
print(f"{'='*60}")
sucessos = sum(1 for r in resultados if [Link]("sucesso"))
falhas = len(resultados) - sucessos
print(f"Total processados: {len(resultados)}")
print(f"Sucessos: {sucessos}")
print(f"Falhas: {falhas}")
if falhas > 0:
print("\nCartões com falha:")
for r in resultados:
if not [Link]("sucesso"):
print(f" • {r['cartao_numero']} - {[Link]('erro', 'Erro
desconhecido')}")
print(f"\nResultados salvos em: [Link]")
print(f"Logs detalhados em: debug_all.txt")
# ---------- self-clean relauncher ----------
def _is_windows():
return [Link] == "nt"
def relaunch_in_temp(copy_list=None, remove_before_run=None,
env_flag="RUN_CLEANED"):
"""
Cria um diretório temporário, copia arquivos listados em copy_list (por padrão
apenas o script atual),
remove arquivos listados em remove_before_run no diretório atual (para começar
limpo),
executa a cópia do script no tempdir com a variável de ambiente env_flag
definida,
aguarda término, remove o tempdir e retorna o código de saída do processo
filho.
"""
# Se já estivermos na execução "limpa", não relançar
if [Link](env_flag) == "1":
return None
cwd = [Link]()
script_path = Path(__file__).resolve()
script_name = script_path.name
# Arquivos a copiar por padrão: apenas o próprio script
if copy_list is None:
copy_list = [script_name]
# Arquivos a remover antes de rodar (logs/resultados) no diretório atual
if remove_before_run is None:
remove_before_run = ["debug_all.txt", "[Link]"]
# Remover arquivos antigos no diretório atual (silencioso)
for fn in remove_before_run:
try:
p = cwd / fn
if [Link]():
[Link]()
except Exception:
pass
# Criar tempdir
tmpdir = Path([Link](prefix="run_clean_"))
try:
# Copiar arquivos necessários para o tempdir
for fn in copy_list:
src = cwd / fn
if [Link]():
dst = tmpdir / [Link]
shutil.copy2(src, dst)
# Copiar também o arquivo [Link] se existir
lista_src = cwd / "[Link]"
if lista_src.exists():
shutil.copy2(lista_src, tmpdir / "[Link]")
# Construir comando para executar o script copiado
python_exe = [Link] or ("python" if not _is_windows() else
"python")
cmd = [python_exe, str(tmpdir / script_name)]
# Preparar ambiente filho com flag para evitar relançamento recursivo
child_env = [Link]()
child_env[env_flag] = "1"
# Executar em novo processo e aguardar término
proc = [Link](cmd, env=child_env)
[Link]()
return [Link]
finally:
# Remover tempdir (tenta várias vezes se houver travamento)
try:
[Link](tmpdir)
except Exception:
pass
# Entry point: relaunch in tempdir for a clean run, then execute main in the child
process
if __name__ == "__main__":
# If not already running cleaned, relaunch in tempdir and exit parent
if [Link]("RUN_CLEANED") != "1":
rc = relaunch_in_temp(copy_list=[Path(__file__).name],
remove_before_run=["debug_all.txt", "[Link]"], env_flag="RUN_CLEANED")
if rc is not None:
[Link](rc)
# If we are the cleaned child (RUN_CLEANED=1) or relaunch returned None, run
main normally
main()