OTA(空中下载)更新让你无需等待 App Store 审核,就能向用户推送 JavaScript 变更、错误修复和新功能。Expo 的托管服务(EAS Update)运行良好,但自托管有充分的理由:大规模的成本控制、数据主权要求,或者只是想自己掌控基础设施。
本文介绍我如何使用 Django REST Framework 和 Tigris S3 构建自托管 Expo 更新服务器。这就是我的 Curtain Estimator 应用在 iOS 和 Android 上实现 OTA 更新的架构。
为什么要自托管 Expo 更新?#
成本控制#
EAS Update 的定价随使用量增长。如果你向大量用户频繁推送更新,用 Tigris 这类便宜的 S3 兼容存储自托管确实能省下不少钱。
数据主权#
某些行业需要完全控制应用程序资产的存储位置。自托管确保所有更新包保留在您的基础设施中。
自定义业务逻辑#
需要向特定用户群体推送更新?想要对不同的包进行 A/B 测试?使用自定义服务器,您可以控制整个更新流程。
无供应商锁定#
您的更新基础设施不依赖于 Expo 的服务可用性或价格变更。
架构概述#
系统由四个主要组件组成:
- Django 后端:提供更新清单并存储元数据
- Tigris S3 存储:托管实际的包和资产文件
- 发布管道:执行导出、上传和注册更新的脚本
- 移动应用:配置为从您的服务器检查更新
工作原理如下:
┌─────────────────┐
│ Mobile App │
│ (expo-updates) │
└────────┬────────┘
│ 1. Request manifest
│ (with headers: platform, runtime-version)
↓
┌─────────────────┐
│ Django Server │
│ /api/expo- │◄─── 2. Query DB for latest update
│ updates/ │
│ manifest/ │
└────────┬────────┘
│ 3. Generate presigned URLs
│
↓
┌─────────────────┐
│ Tigris S3 │
│ (Asset Files) │◄─── 4. App downloads bundles directly
└─────────────────┘实现细节#
Django 模型#
基础是两个 Django 模型:ExpoUpdate 和 ExpoUpdateAsset。
ExpoUpdate 存储每个更新的元数据:
class ExpoUpdate(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
runtime_version = models.CharField(max_length=50, db_index=True)
platform = models.CharField(
max_length=10,
choices=[("ios", "iOS"), ("android", "Android")],
db_index=True
)
is_active = models.BooleanField(default=True, db_index=True)
manifest_data = models.JSONField()
description = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
models.Index(fields=["runtime_version", "platform", "is_active", "-created_at"])
]关键设计决策:
- runtime_version:与
app.json中的runtimeVersion匹配。这至关重要——客户端只下载与其运行时版本匹配的更新。 - platform:iOS 和 Android 的包不同,因此使用单独的更新。
- is_active:通过停用有问题的更新来支持回滚。
- manifest_data:将完整的 Expo Updates v1 协议清单存储为 JSON。
ExpoUpdateAsset 跟踪单个文件:
class ExpoUpdateAsset(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
update = models.ForeignKey(ExpoUpdate, on_delete=models.CASCADE, related_name="assets")
hash = models.CharField(max_length=255, db_index=True)
key = models.CharField(max_length=255)
content_type = models.CharField(max_length=100)
file_extension = models.CharField(max_length=10)
file_path = models.CharField(max_length=500)
file_size = models.IntegerField(default=0)资产通过其 SHA-256 哈希引用,使其不可变且可缓存。同一资产可以在多个更新之间共享。
清单端点#
/api/expo-updates/manifest/ 端点是系统的核心。它实现了 Expo Updates v1 协议。
@action(detail=False, methods=["get"], url_path="manifest")
def manifest(self, request):
# Extract required headers
protocol_version = request.META.get("HTTP_EXPO_PROTOCOL_VERSION")
platform = request.META.get("HTTP_EXPO_PLATFORM")
runtime_version = request.META.get("HTTP_EXPO_RUNTIME_VERSION")
# Validate protocol version
if protocol_version != "1":
return Response(
{"error": f"Unsupported protocol version: {protocol_version}"},
status=400
)
# Find latest active update for this runtime + platform
update = ExpoUpdate.objects.filter(
runtime_version=runtime_version,
platform=platform,
is_active=True,
).order_by("-created_at").first()
# No update available - client uses embedded bundle
if not update:
response = Response(status=204)
response["expo-protocol-version"] = "1"
return response
# Generate presigned URLs for all assets
manifest_data = self._generate_manifest_with_presigned_urls(update)
return Response(manifest_data, status=200)关键细节:
- 204 No Content:当没有找到更新时,返回 204。应用将使用内嵌的包。
- Presigned URL:生成有时间限制的 URL,允许应用直接从 Tigris CDN 下载——比通过 Django 代理快得多。
- 头部:响应中必须包含
expo-protocol-version头部。
发布更新#
发布工作流通过 Django 管理命令和 shell 脚本包装器自动化。
管理命令(publish_expo_update.py)处理:
- 读取
expo export的输出 - 计算所有资产的 SHA-256 哈希
- 将包和资产并行上传到 Tigris S3
- 创建更新和资产的数据库记录
- 可选地上传用于生产同步的导入 JSON
核心流程如下:
def _publish_platform(self, platform, runtime_version, export_dir, ...):
# 1. Find the bundle file
bundle_files = list(bundle_dir.glob("entry-*.hbc"))
bundle_file = bundle_files[0]
# 2. Calculate hash
with open(bundle_file, "rb") as f:
bundle_content = f.read()
bundle_hash = self._calculate_hash(bundle_content)
# 3. Collect all assets and their hashes
for asset_file in assets_dir.rglob("*"):
# Calculate hash, determine content type...
assets_metadata.append({...})
# 4. Upload to S3 in parallel
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(upload_asset, a): a for a in assets_metadata}
# 5. Create database records
with transaction.atomic():
# Deactivate previous updates
ExpoUpdate.objects.filter(
runtime_version=runtime_version,
platform=platform,
is_active=True
).update(is_active=False)
# Create new update
update = ExpoUpdate.objects.create(...)Shell 脚本(publish-ota-update.sh)提供用户友好的界面:
# Publish to local environment
./scripts/publish-ota-update.sh ios
# Publish to production
./scripts/publish-ota-update.sh ios --production
# Dry run to validate
./scripts/publish-ota-update.sh --dry-run主要功能:
- 使用生产环境变量自动导出应用
- 并行上传资产以提高速度
- 支持特定平台或多平台更新
- 通过 API 端点同步到生产环境
通过 API 进行生产同步#
对于生产部署,系统使用巧妙的两步流程:
- 本地发布:将资产上传到 Tigris 并创建 JSON 快照
- 远程导入:使用 S3 路径调用生产 API 来导入元数据
这种方法避免了在本地机器上存放生产凭据:
@action(detail=False, methods=["post"], url_path="import-update")
def import_update(self, request):
# Authenticate via Bearer token
secret = settings.OTA_IMPORT_SECRET
token = request.META.get("HTTP_AUTHORIZATION", "")[7:] # Strip "Bearer "
if not hmac.compare_digest(token, secret):
return Response({"error": "Invalid token"}, status=401)
# Download import JSON from Tigris
s3_key = request.data.get("s3_key")
obj = s3_client.get_object(Bucket=bucket_name, Key=s3_key)
data = json.loads(obj["Body"].read())
# Import to production database
with transaction.atomic():
ExpoUpdate.objects.update_or_create(id=data["id"], defaults={...})
for asset_data in data["assets"]:
ExpoUpdateAsset.objects.update_or_create(...)
# Clean up the import JSON
s3_client.delete_object(Bucket=bucket_name, Key=s3_key)移动应用配置#
在 app.json 中配置更新 URL 和运行时版本:
{
"expo": {
"runtimeVersion": "1.0.0",
"updates": {
"url": "https://your-server.com/api/expo-updates/manifest/"
}
}
}重要:runtimeVersion 必须在应用和服务器之间匹配。当您更改原生代码或升级 Expo SDK 时,请递增运行时版本并发布新更新。
使用 Tigris 存储#
Tigris 是一个 S3 兼容的对象存储服务,价格比 AWS S3 低得多,并包含全球边缘缓存。
Django 中的配置:
# settings.py
BUCKET_NAME = os.getenv("BUCKET_NAME")
AWS_ENDPOINT_URL_S3 = os.getenv("AWS_ENDPOINT_URL_S3")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.getenv("AWS_REGION", "auto")创建 S3 客户端:
import boto3
def create_s3_client(endpoint_url, region, access_key, secret_key):
return boto3.client(
"s3",
endpoint_url=endpoint_url,
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)文件上传使用 presigned URL 实现 CDN 直接分发:
presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": asset.file_path},
ExpiresIn=3600, # 1 hour
)安全考虑#
认证#
清单端点设计为无需认证——移动应用在用户登录之前就需要更新。但是,导入端点需要共享密钥:
OTA_IMPORT_SECRET = os.getenv("OTA_IMPORT_SECRET")
# Constant-time comparison prevents timing attacks
if not hmac.compare_digest(token, secret):
return Response({"error": "Invalid token"}, status=401)资产完整性#
所有资产通过 SHA-256 哈希进行验证。如果资产被篡改,哈希将不匹配,更新将失败。
Presigned URL#
URL 在 1 小时后过期,防止对包的长期未授权访问。
性能优化#
数据库索引#
(runtime_version, platform, is_active, -created_at) 的复合索引确保即使有数千个更新,清单查询也很快:
class Meta:
indexes = [
models.Index(fields=["runtime_version", "platform", "is_active", "-created_at"])
]并行上传#
发布脚本使用 ThreadPoolExecutor 同时上传资产:
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(upload_asset, asset): asset for asset in assets}
for future in as_completed(futures):
# Track progress对于包含 50 个资产的典型更新,发布时间从 2 分钟缩短到 15 秒。
CDN 分发#
Tigris 包含全球边缘缓存。资产自动分发到用户附近的边缘节点,减少下载时间。
运维工作流#
日常开发#
# 1. Make code changes in mobile app
cd mobile-app && git commit -am "Fix bug"
# 2. Publish OTA update to local environment
yarn publish-update:ios
# 3. Test on device
# App automatically downloads and applies update生产部署#
# 1. Publish to production
yarn publish-update:prod:ios
# 2. Monitor
# Check Django admin for update records
# Verify assets in Tigris dashboard回滚#
# Mark problematic update as inactive in Django admin
# or via management shell:
python manage.py shell
>>> from jobs.models import ExpoUpdate
>>> bad_update = ExpoUpdate.objects.get(id="uuid-here")
>>> bad_update.is_active = False
>>> bad_update.save()
# Clients will now receive the previous active update成本分析#
以我的 Curtain Estimator 应用(约 500 活跃用户)为例:
Tigris 存储:
- 存储:~200 MB(历史更新)= $0.02/月
- 出站流量:~50 GB/月(更新下载)= $1.00/月
- 合计:~$1/月
Django 托管(Fly.io):
- 包含在现有应用托管中
- 增量成本:$0
OTA 基础设施总成本:~$1/月
与 EAS Update 的定价相比,类似使用量每年需要 $300-500。自托管方案立即实现成本回收。
监控和调试#
日志#
ViewSet 记录所有清单请求:
logger.info(f"Manifest request: platform={platform}, runtime={runtime_version}")Django Admin#
在 Django admin 中注册模型以便于检查:
@admin.register(ExpoUpdate)
class ExpoUpdateAdmin(admin.ModelAdmin):
list_display = ["platform", "runtime_version", "is_active", "created_at"]
list_filter = ["platform", "is_active", "runtime_version"]
search_fields = ["description"]客户端调试#
在应用中启用更新日志:
import * as Updates from 'expo-updates';
Updates.checkForUpdateAsync().then(update => {
console.log('Update available:', update.isAvailable);
console.log('Manifest:', update.manifest);
});限制和注意事项#
运行时版本匹配#
最常见的问题:客户端只下载与其运行时版本匹配的更新。如果您的应用运行时版本是 1.0.0,但您为 1.0.1 发布了更新,更新将不会被送达。
解决方案:保持运行时版本与应用构建同步,仅在原生代码更改时递增。
createdAt 时间戳#
expo-updates 客户端将清单的 createdAt 时间戳与内嵌包的 commitTime 进行比较。更新仅在 createdAt 更新时才会被应用。
修复:ViewSet 使用数据库时间戳覆盖 createdAt:
manifest_data["createdAt"] = update.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")在本地开发中,如果您在发布 OTA 之后构建了二进制文件,请重新发布 OTA 以确保更新的时间戳。
清单资产 key 值#
清单中每个资产的 key 字段被 expo-updates 用于缓存。它必须是确定性哈希(例如文件名的 MD5),而不是随机 UUID 或任意字符串。如果使用随机值,客户端可能无法正确缓存或检索资产,导致更新在首次成功加载后静默失败。感谢读者 Raphael Mutschler 发现了这个问题——他的更新在发现问题之前恰好只成功运行了一次。
expoClient 应用配置#
如果你的应用使用了 Linking、Constants 或其他在运行时读取应用配置的功能,你需要在清单的 extra.expoClient 字段中包含应用配置。否则,应用可能在首次启动时正常运行,但关闭后重新打开时崩溃或无法启动。这是因为 expo-updates 会用 OTA 清单替换内置清单,如果缺少 expoClient,那些 API 就无法访问它们所依赖的配置。同样感谢 Raphael Mutschler 发现了这个问题。
资产清理#
旧更新会在 Tigris 中累积。目前清理是手动的:
# Delete updates older than 30 days
from datetime import timedelta
from django.utils import timezone
cutoff = timezone.now() - timedelta(days=30)
old_updates = ExpoUpdate.objects.filter(created_at__lt=cutoff, is_active=False)
for update in old_updates:
# Delete assets from S3
for asset in update.assets.all():
s3_client.delete_object(Bucket=bucket_name, Key=asset.file_path)
# Delete DB records
update.delete()考虑将此添加到定时任务中。
未来改进#
灰度发布控制#
添加 rollout_percentage 字段以逐步发布更新:
rollout_percentage = models.IntegerField(default=100)
# In the manifest view:
if update.rollout_percentage < 100:
# Hash user ID and check if they're in rollout group
user_hash = int(hashlib.sha256(user_id.encode()).hexdigest(), 16)
if (user_hash % 100) >= update.rollout_percentage:
return Response(status=204) # No update多环境支持#
添加 environment 字段以为暂存和生产提供不同的更新:
environment = models.CharField(max_length=20, default="production")
# Client sends environment in custom header
environment = request.META.get("HTTP_X_UPDATE_ENVIRONMENT", "production")
update = ExpoUpdate.objects.filter(environment=environment, ...).first()分析#
跟踪下载指标:
class ExpoUpdateDownload(models.Model):
update = models.ForeignKey(ExpoUpdate, on_delete=models.CASCADE)
user_id = models.CharField(max_length=255, null=True)
platform = models.CharField(max_length=10)
downloaded_at = models.DateTimeField(auto_now_add=True)总结#
用 Django 和 S3 兼容存储自托管 Expo OTA 更新比想象中简单。整套系统需要:
- 约 150 行 Django 模型和视图
- 约 200 行发布脚本
- 约 $1/月的基础设施成本
这里的代码示例来自我的生产环境 Curtain Estimator 应用。根据你的需求自行调整即可。读者 Raphael Mutschler 也发布了一个独立实现 expo-ota-server,如果你想要一个开箱即用的起点可以参考。
完整规范请参阅 Expo Updates Protocol Specification。
完整脚本#
publish-ota-update.sh#
#!/bin/bash
# Publish OTA Update Script
#
# Usage:
# ./scripts/publish-ota-update.sh # Both platforms (LOCAL)
# ./scripts/publish-ota-update.sh ios # iOS only (LOCAL)
# ./scripts/publish-ota-update.sh ios --production # iOS to PRODUCTION
# ./scripts/publish-ota-update.sh --dry-run # Test without uploading
# ./scripts/publish-ota-update.sh ios --description "Bug fixes"
set -e
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
RED='\033[0;31m'
NC='\033[0m'
# Parse arguments
PLATFORM="all"
PRODUCTION=false
DRY_RUN=false
DESCRIPTION=""
while [[ $# -gt 0 ]]; do
case $1 in
ios|android|all) PLATFORM="$1"; shift ;;
--production|--prod) PRODUCTION=true; shift ;;
--dry-run) DRY_RUN=true; shift ;;
--description) DESCRIPTION="$2"; shift 2 ;;
-h|--help)
echo "Usage: $0 [ios|android|all] [--production] [--dry-run] [--description \"msg\"]"
exit 0 ;;
*) echo -e "${RED}Unknown: $1${NC}"; exit 1 ;;
esac
done
# Auto-detect project root (support running from mobile-app/ via yarn)
if [ -d "mobile-app" ]; then
: # already at project root
elif [ -d "../mobile-app" ]; then
cd ..
else
echo -e "${RED}Error: Run from project root or mobile-app/${NC}" && exit 1
fi
docker info > /dev/null 2>&1 || { echo -e "${RED}Error: Docker not running${NC}"; exit 1; }
# Log file — verbose output goes here, terminal gets summary only
LOG_FILE="ota-publish-$(date +%Y%m%d-%H%M%S).log"
echo -e "${BLUE}═══ Expo OTA Publisher ═══${NC}"
echo -e "Platform: ${PLATFORM} Production: ${PRODUCTION} Log: ${LOG_FILE}"
echo ""
# ── Step 1: Export with production env vars ──
echo -e "${YELLOW}Step 1: Exporting mobile app...${NC}"
cd mobile-app
# Load production env vars from eas.json (adapt these to your app's env vars)
if [ -f "eas.json" ] && command -v jq &> /dev/null; then
for key in $(jq -r '.build.production.env // {} | keys[]' eas.json); do
export "$key"="$(jq -r ".build.production.env.$key" eas.json)"
done
fi
OTA_EXPORT_DIR="dist-ota"
if [ "$PLATFORM" = "all" ]; then
npx expo export --platform ios --output-dir "$OTA_EXPORT_DIR" >> "../$LOG_FILE" 2>&1
npx expo export --platform android --output-dir "$OTA_EXPORT_DIR" >> "../$LOG_FILE" 2>&1
else
npx expo export --platform "$PLATFORM" --output-dir "$OTA_EXPORT_DIR" >> "../$LOG_FILE" 2>&1
fi
cd ..
echo -e "${GREEN}✓ Export complete${NC}"
# ── Step 2: Upload to Tigris + create DB records ──
echo -e "${YELLOW}Step 2: Publishing to Tigris...${NC}"
CMD_ARGS="--platform $PLATFORM --export-dir mobile-app/$OTA_EXPORT_DIR"
[ "$DRY_RUN" = true ] && CMD_ARGS="$CMD_ARGS --dry-run"
[ -n "$DESCRIPTION" ] && CMD_ARGS="$CMD_ARGS --description \"$DESCRIPTION\""
[ "$PRODUCTION" = true ] && CMD_ARGS="$CMD_ARGS --production-sync"
PUBLISH_OUTPUT=$(eval docker compose exec -T django python manage.py publish_expo_update $CMD_ARGS 2>&1)
echo "$PUBLISH_OUTPUT" >> "$LOG_FILE"
# Print key lines to terminal
echo "$PUBLISH_OUTPUT" | grep -E '✓ Published:|Deactivated|OTA_S3_KEY=|DRY RUN|ERROR|Failed' || true
# ── Step 3 (production only): Sync via API endpoint ──
if [ "$PRODUCTION" = true ] && [ "$DRY_RUN" = false ]; then
echo -e "${YELLOW}Step 3: Syncing to production...${NC}"
# Extract S3 key(s) from management command output
S3_KEYS=$(echo "$PUBLISH_OUTPUT" | grep -o 'OTA_S3_KEY=[^ ]*' | sed 's/OTA_S3_KEY=//')
[ -z "$S3_KEYS" ] && echo -e "${RED}Error: No OTA_S3_KEY found in publish output${NC}" && exit 1
# Read OTA_IMPORT_SECRET from .env
if [ -f ".env" ]; then
OTA_IMPORT_SECRET=$(grep -E '^OTA_IMPORT_SECRET=' .env | sed 's/^OTA_IMPORT_SECRET=//')
fi
[ -z "$OTA_IMPORT_SECRET" ] && echo -e "${RED}Error: OTA_IMPORT_SECRET not found in .env${NC}" && exit 1
PROD_URL="https://your-app.fly.dev/api/expo-updates/import-update/"
for S3_KEY in $S3_KEYS; do
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST "$PROD_URL" \
-H "Authorization: Bearer $OTA_IMPORT_SECRET" \
-H "Content-Type: application/json" \
-d "{\"s3_key\": \"$S3_KEY\"}")
HTTP_CODE=$(echo "$RESPONSE" | tail -1)
BODY=$(echo "$RESPONSE" | sed '$d')
echo "$BODY" >> "$LOG_FILE"
if [ "$HTTP_CODE" = "200" ]; then
UPDATE_ID=$(echo "$BODY" | python3 -c "import sys,json; print(json.load(sys.stdin)['update_id'])" 2>/dev/null || echo "unknown")
PLAT=$(echo "$BODY" | python3 -c "import sys,json; print(json.load(sys.stdin)['platform'])" 2>/dev/null || echo "unknown")
NOTIF_COUNT=$(echo "$BODY" | python3 -c "import sys,json; print(json.load(sys.stdin).get('notifications_sent', 0))" 2>/dev/null || echo "0")
echo -e "${GREEN}✓ ${PLAT}: ${UPDATE_ID}${NC}"
echo -e "${GREEN}✓ Sent ${NOTIF_COUNT} push notification(s) to production users${NC}"
else
echo -e "${RED}Error: HTTP $HTTP_CODE${NC}"
echo "$BODY"
exit 1
fi
done
fi
echo ""
echo -e "${GREEN}═══ ✓ Done ═══${NC}"
echo -e "Full log: ${LOG_FILE}"sync-ota-to-prod.sh#
#!/bin/bash
# Sync OTA Update to Production Database
# This script copies an OTA update record from local to production database
# The bundles are already in Tigris (shared between local and production)
set -e
# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'
# Get the update ID from arguments or use the latest
UPDATE_ID="$1"
if [ -z "$UPDATE_ID" ]; then
echo -e "${YELLOW}No update ID provided, using latest iOS update...${NC}"
UPDATE_ID=$(docker compose exec -T django python manage.py shell -c "
from jobs.models import ExpoUpdate
update = ExpoUpdate.objects.filter(platform='ios').order_by('-created_at').first()
print(update.id if update else '')
" | tail -1 | tr -d '\r\n')
fi
echo -e "${BLUE}Syncing OTA Update to Production${NC}"
echo -e "${BLUE}Update ID: $UPDATE_ID${NC}"
echo ""
# Export the update data from local database
echo -e "${YELLOW}Step 1/2: Exporting from local database...${NC}"
docker compose exec -T django python manage.py shell -c "
import json
from jobs.models import ExpoUpdate, ExpoUpdateAsset
try:
update = ExpoUpdate.objects.get(id='$UPDATE_ID')
except ExpoUpdate.DoesNotExist:
print('ERROR: Update not found')
exit(1)
# Export update
print(json.dumps({
'id': str(update.id),
'runtime_version': update.runtime_version,
'platform': update.platform,
'is_active': update.is_active,
'manifest_data': update.manifest_data,
'description': update.description,
'assets': [
{
'id': str(asset.id),
'hash': asset.hash,
'key': asset.key,
'content_type': asset.content_type,
'file_extension': asset.file_extension,
'file_path': asset.file_path,
'file_size': asset.file_size,
}
for asset in update.assets.all()
]
}))
" > /tmp/ota_sync_$UPDATE_ID.json
# Check if export succeeded
if [ ! -s /tmp/ota_sync_$UPDATE_ID.json ]; then
echo -e "${RED}Failed to export update${NC}"
exit 1
fi
echo -e "${GREEN}✓ Exported update data${NC}"
echo ""
# Import to production database
echo -e "${YELLOW}Step 2/2: Importing to production database...${NC}"
# Create Python script for import
cat > /tmp/ota_import.py << 'EOFPY'
import json
from jobs.models import ExpoUpdate, ExpoUpdateAsset
with open('/tmp/ota_data.json', 'r') as f:
data = json.load(f)
# Create or update the ExpoUpdate record
update, created = ExpoUpdate.objects.update_or_create(
id=data['id'],
defaults={
'runtime_version': data['runtime_version'],
'platform': data['platform'],
'is_active': data['is_active'],
'manifest_data': data['manifest_data'],
'description': data['description'],
}
)
print(f"Update: {'created' if created else 'updated'}")
print(f" ID: {update.id}")
print(f" Platform: {update.platform}")
print(f" Runtime: {update.runtime_version}")
print(f" Description: {update.description}")
# Create assets
assets_created = 0
for asset_data in data['assets']:
_, created = ExpoUpdateAsset.objects.update_or_create(
id=asset_data['id'],
defaults={
'update': update,
'hash': asset_data['hash'],
'key': asset_data['key'],
'content_type': asset_data['content_type'],
'file_extension': asset_data['file_extension'],
'file_path': asset_data['file_path'],
'file_size': asset_data['file_size'],
}
)
if created:
assets_created += 1
print(f"Assets: {assets_created} created, {len(data['assets']) - assets_created} updated")
print(f"✓ OTA update successfully synced to production!")
EOFPY
# Copy JSON to temp location and import
flyctl ssh console -C "cat > /tmp/ota_data.json" < /tmp/ota_sync_$UPDATE_ID.json
flyctl ssh console -C "cat > /tmp/ota_import.py" < /tmp/ota_import.py
flyctl ssh console -C "python manage.py shell < /tmp/ota_import.py"
# Cleanup
rm /tmp/ota_sync_$UPDATE_ID.json /tmp/ota_import.py
echo ""
echo -e "${GREEN}╔════════════════════════════════════════════════════════════╗${NC}"
echo -e "${GREEN}║ ✓ OTA Update Synced to Production! ║${NC}"
echo -e "${GREEN}╚════════════════════════════════════════════════════════════╝${NC}"
echo ""
# Verify
echo -e "${BLUE}Verifying production...${NC}"
flyctl ssh console -C "python manage.py shell -c \"
from jobs.models import ExpoUpdate
count = ExpoUpdate.objects.count()
latest = ExpoUpdate.objects.order_by('-created_at').first()
print(f'Total OTA updates: {count}')
if latest:
print(f'Latest: {latest.platform} - {latest.description}')
\""
