API Cookbook¶
[!CAUTION] API Maintenance & Schema Drift The
franklinwh-cloudlibrary identifies itself to the FranklinWH Cloud API using asoftwareversionrequest header (default:APP2.4.1). This value is sent with every request for the lifetime of the session.What this header does: It identifies the client as a known app version. Testing across versions
APP1.0.0throughAPP99.0.0showed identical responses on all tested endpoints β the header does not appear to gate specific fields or alter payload structures in the current API. Its primary observed role is authentication token negotiation and likely server-side telemetry/analytics.Schema drift detection: The library runs a built-in canary trap (
Client._check_canary_trap) that scans every response for asoftwareVersionfield and fires a warning + disk dump if a version newer than the certified baseline (APP2.11.0) is detected. This is the real mechanism for detecting upstream API changes. Always referencedocs/OPENAPI_GENERATOR.mdif payloads shift unexpectedly.To override the header for a single diagnostic call:
franklinwh-cli fetch --app-version APP2.11.0 ...To set it globally: passemulate_app_version="APP2.11.0"toClient(...)orPasswordAuth(...).
Practical recipes for the FranklinWH Cloud API. Each recipe is copy-paste ready.
Prerequisites: See SANDBOX_SETUP.md for venv and credentials setup. Full method reference: See API_REFERENCE.md for all 70+ methods with args.
π« API Anti-Patterns & Polling Best Practices¶
Before building automated dashboards or backend integrators (like the FranklinWH Energy Manager), you must separate your polling loops into two distinct pipelines.
What NOT To Do:
Do not poll static hardware data or compliance rules (get_connectivity_overview, get_device_info, getComplianceDetailById, get_smart_circuits_info) at the same frequency as power telemetry! Tying static fetches to your 5-10 second telemetry tick will aggressively throttle the aGate's internal MQTT relay, flood AWS with useless calls, and instantly cause DeviceTimeoutException API crashes.
What To Do (The Best Practice Architecture):
1. The Fast Loop (get_stats): Poll get_stats() rapidly (e.g., every 5-15 seconds) for real-time power flow, SoC levels, and grid states. This endpoint is highly optimized for frequency.
2. The Slow Loop (Static/Network Data): Poll static/config endpoints once on application startup, and then refresh them on a slow, lazy timer (e.g., every 15-60 minutes), or exclusively when a user clicks a manual "Refresh" button.
Legacy Field Aliases (Relays)¶
The cloud API often exposes duplicated attributes via different payload structures. Specifically for hardware relays, the legacy gridRelayStat, oilRelayStat, and solarRelayStat (from get_power_info) perfectly duplicate the array main_sw (from get_device_composite_info runtimeData).
* gridRelayStat == main_sw[0]
* oilRelayStat == main_sw[1] (Generator)
* solarRelayStat == main_sw[2]
Recommendation: Always consume the curated client.get_stats() β Stats.current object, which evaluates and normalizes these aliases automatically beneath the hood without making excessive API calls.
Native Library Cache & Rate Limiting¶
The franklinwh-cloud library ships with built-in mechanisms to actively combat excessive polling and protect the fragile aGate MQTT boundaries. If your integrator tool (like an Admin Console) shows thousands of hits to static endpoints over just a few hours, your architecture is circumventing the internal cache boundaries!
1. The Method TTL Cache (Proactive)
You can instruct the client to cache expensive, slow-changing static endpoints (like Smart Circuits or BMS data) natively for a fixed TTL. If you call get_smart_circuits_info() 10 times in a minute, only 1 actual API request will be sent to the cloud.
from franklinwh_cloud import FranklinWHCloud
from franklinwh_cloud.cache import DEFAULT_CACHE
# Initialize with library-recommended TTL mapping
client = FranklinWHCloud("YOUR_EMAIL", "YOUR_PASSWORD", cache=DEFAULT_CACHE)
# Or override specific TTLs (seconds)
custom_cache = {
**DEFAULT_CACHE,
"get_bms_info": 120, # Cache battery cell voltages for 2 mins
"get_device_info": 600, # Cache hardware serials for 10 mins
}
client = FranklinWHCloud("YOUR_EMAIL", "YOUR_PASSWORD", cache=custom_cache)
set_smart_switch_state, automatically invalidates its relevant cache slot.)
2. Stale Data Degradation (Reactive)
If the upstream FranklinWH Cloud encounters an outage or severely limits your account, you can enable tolerate_stale_data. The client will safely serve the last-known-good telemetry rather than throwing hard TimeoutExceptions.
client = FranklinWHCloud(
email="YOUR_EMAIL",
password="YOUR_PASSWORD",
tolerate_stale_data=True,
stale_cache_ttl=300 # Data is considered "stale but usable" for 5 minutes
)
Always check client.metrics.snapshot() to audit your background thread discipline.
π¦ Transport Architecture: REST GET vs MQTT Relay¶
Understanding the two transport paths in this library is essential before writing custom polling code or building your own API layer on top. The two mechanisms have fundamentally different cost and latency characteristics.
The Two Transport Paths¶
| Transport | Mechanism | Cost | CloudFront-cacheable | Who uses it |
|---|---|---|---|---|
| REST GET β cloud aggregator | HTTPS GET to /hes-gateway/terminal/getDeviceCompositeInfo |
Cheapest β cloud assembles response from its DB | β Yes | get_stats(), get_mode(), set_mode() |
| REST GET/POST β cloud REST API | Standard HTTPS to cloud REST endpoints | Low-medium | Partial | get_gateway_tou_list(), get_tou_info(), set_mode() POST |
| MQTT Relay β physical aGate | POST to /hes-gateway/terminal/sendMqtt with cmdType in body |
Higher β cloud relays over MQTT to aGate hardware, awaits response | β No | get_power_info() (211), get_smart_circuits_info() (311), _switch_usage() (353) |
[!IMPORTANT] The MQTT relay path is a physical hardware round-trip. Every
sendMqttcall crosses: Cloud β AWS IoT β aGate firmware β back. This is not a DB lookup. Overusing MQTT relay calls on fast poll cycles is the primary cause ofDeviceTimeoutExceptionfailures.
What the Schema 203/, 211/, 311/ Source Prefixes Mean¶
The franklinwh-cli schema command's Source column uses cmdType/sub-object notation.
These prefixes describe which command produced the field β they are not transport cost labels.
| Source prefix | Transport path | When it fires |
|---|---|---|
203/runtimeData, 203/result |
REST GET β getDeviceCompositeInfo |
Every get_stats() call β the main poll (cheapest) |
211/result |
MQTT Relay β get_power_info() (cmdType 211) |
Only when get_stats(include_electrical=True) |
311/runtimeData, 311/sw_data |
MQTT Relay β _switch_usage() (cmdType 353) |
Only when smart circuits are active (pro_load[] non-zero) |
derived |
Local computation | Free β no API call |
get_tou_info |
REST GET β TOU schedule endpoint | Only when get_mode() is in TOU mode |
[!NOTE]
203/runtimeDatais not asendMqtt + cmdType 203call. The private_status()method (which does usesendMqttwith cmdType 203) is a legacy low-level method not called byget_stats(). ThegetDeviceCompositeInfoREST GET is the production path.
The get_stats() Call Tree¶
get_stats()
β
ββ get_device_composite_info() β 1x REST GET β all 203/ fields, cheap + CloudFront-cach'd
β ββ result: runtimeData, currentWorkMode, solarHaveVo, deviceStatus, alarms, relays
β
ββ get_operating_mode_name() β dict lookup (OPERATING_MODES const) β FREE
β
ββ [conditional] _switch_usage() β MQTT relay cmdType 353 β only if pro_load[] non-zero
β ββ triggers: smart circuit SW1/SW2/V2L present and active in runtimeData.pro_load
β
ββ [conditional] get_grid_status() β REST GET β only if relay OPEN or offgridreason set
β ββ triggers: ~0.1% of normal polls (relay open or off-grid flag)
β
ββ [conditional] get_power_info() β MQTT relay cmdType 211 β only if include_electrical=True
ββ triggers: caller opts in explicitly (FHAI uses every Nth poll, not every tick)
Normal poll (grid-connected, no smart circuits, no electrical): 1 REST GET. That's it.
The composite_hint Pattern β Avoiding Redundant Fetches¶
When your code calls get_stats() and then immediately calls get_mode() or set_mode(),
you are paying for getDeviceCompositeInfo twice β once inside get_stats() and once
inside the mode call. Use composite_hint to eliminate the duplicate:
# β Naive pattern β double REST GET:
stats = await client.get_stats() # calls getDeviceCompositeInfo internally
mode = await client.get_mode() # calls getDeviceCompositeInfo AGAIN
# β
Optimised pattern β single REST GET:
composite = await client.get_device_composite_info()
stats = await client.get_stats() # NOTE: get_stats() calls composite internally;
# for a shared-cycle optimisation, call composite
# once and pass it to both:
mode = await client.get_mode(composite_hint=composite)
# β
Simplest optimised pattern for a combined mode+stats read:
composite = await client.get_device_composite_info()
mode = await client.get_mode(composite_hint=composite)
# stats fields available directly in composite["result"]["runtimeData"]
The composite_hint accepts the full response dict returned by get_device_composite_info()
(i.e. {"code": 200, "result": {...}, "message": "success"}). When provided, get_mode() and
set_mode() skip their internal getDeviceCompositeInfo call entirely.
[!TIP]
composite_hintis keyword-only (after a bare*) so it cannot be passed positionally. Existing call sites using positional args are 100% backward-compatible with zero changes.
Why You Should Not Write Your Own Transport Layer¶
The library's transport paths have substantial hardening you would need to replicate:
- Auto-retry with token refresh:
instrumented_retrytransparently catches401/code10009responses, negotiates a new JWT viaTokenFetcher, and replays the original request. - MQTT multiplexing guard:
sendMqttcalls cannot be concurrent β the aGate MQTT broker cannot multiplex simultaneous requests. The library sequences allsendMqttcalls carefully (e.g.get_bms_info()type 2 then type 3 are sequential by design). - Stale data fallback:
get_stats()returns the last-known-goodStatsobject when the cloud returns an emptyresult: nullglitch payload β a real observed cloud bug (not a crash). - CloudFront edge tracking: Every HTTP response is inspected for
X-Cache/x-amz-cf-popheaders to maintain a liveEdgeTrackerPoP map for diagnostic telemetry. - Canary trap: Every response is checked for an unrecognised
softwareVersionfield. If a new firmware version is detected, the payload is dumped to disk for schema diffing.
Reimplementing direct sendMqtt or getDeviceCompositeInfo calls outside this library means
none of the above protections apply to those calls.
π Grid Connection State¶
[!IMPORTANT]
GridConnectionStatereplaces the oldgrid_outage: boolfield (removed 2026-04-10). Any integrator readingstats.current.grid_outagemust migrate tostats.current.grid_connection_state.
The GridConnectionState enum provides unambiguous, four-state grid reporting covering all
real-world FranklinWH site topologies β grid-tied homes, off-grid sites, active outages,
and user-initiated simulation tests.
The Four States¶
| Value | .value (str) |
Meaning | When you see it |
|---|---|---|---|
CONNECTED |
"Connected" |
Grid relay CLOSED β utility power available | Normal daily operation |
OUTAGE |
"Outage" |
Firmware detected grid loss (offGridFlag=1) |
Real grid failure β island mode |
NOT_GRID_TIED |
"NotGridTied" |
Site has no utility connection β permanent island | Off-grid solar/battery installs |
SIMULATED_OFF_GRID |
"SimulatedOffGrid" |
User-initiated island test | Commissioning, testing, drills |
Detection Strategy (Zero-Overhead on Normal Systems)¶
The library derives state from data already fetched by get_stats(). No extra API calls
are made on a normally-connected system:
startup: get_entrance_info() β gridFlag=False β NOT_GRID_TIED cached forever (never re-checked)
per poll:
offGridFlag == 1 β OUTAGE (short-circuit, no extra call)
main_sw[0] == 1 β CONNECTED (no extra call β covers 99.9% of polls)
main_sw[0] == 0 ββ
offgridreason != 0 ββΌβ get_grid_status() β offgridState==1 β SIMULATED_OFF_GRID
ββ β offgridState==0 β OUTAGE
[!NOTE] The dual-gate (
main_sw[0]==0 OR offgridreason!=null) handles a known firmware API reporting lag (~5β10s) whereoffgridreasonis set beforemain_swupdates after a simulated off-grid activation. This is expected vendor behaviour β the grid contactor is a mechanical relay. The library does not retry; it returns the correct state on the first poll where API data is internally consistent.
Basic Usage¶
import asyncio
from franklinwh_cloud import FranklinWHCloud
from franklinwh_cloud.models import GridConnectionState
async def main():
client = FranklinWHCloud.from_config("franklinwh.ini")
await client.login()
await client.select_gateway()
stats = await client.get_stats()
state = stats.current.grid_connection_state
# .value gives the display string directly
print(f"Grid: {state.value}")
# Output: "Connected" | "Outage" | "SimulatedOffGrid" | "NotGridTied"
# Exact identity check
if state == GridConnectionState.CONNECTED:
print("β
Grid available β normal operation")
elif state == GridConnectionState.OUTAGE:
print("β Grid outage β running on battery + solar")
elif state == GridConnectionState.SIMULATED_OFF_GRID:
print("β‘ Simulated off-grid β user-initiated island test")
elif state == GridConnectionState.NOT_GRID_TIED:
print("ποΈ Off-grid site β no utility connection")
asyncio.run(main())
State-Gated Automation (Safe Pattern)¶
Gate grid-dependent actions behind CONNECTED. This prevents dispatching grid charges
or exports during outages, simulations, or on off-grid sites:
from franklinwh_cloud.models import GridConnectionState
from franklinwh_cloud.const import EMERGENCY_BACKUP
stats = await client.get_stats()
state = stats.current.grid_connection_state
if state == GridConnectionState.CONNECTED:
# Safe to: import from grid, export to grid, run TOU schedules
await client.set_tou_schedule(touMode="SELF")
elif state == GridConnectionState.OUTAGE:
# Grid is down β switch to Emergency Backup to maximise reserve
await client.set_mode(EMERGENCY_BACKUP, soc=100)
print("π¨ Grid outage detected β Emergency Backup activated")
elif state == GridConnectionState.SIMULATED_OFF_GRID:
# User is running an island test β take no automated action
print("β‘ Simulation active β skipping scheduled dispatch")
elif state == GridConnectionState.NOT_GRID_TIED:
# Permanent off-grid site β grid-dependent schedules never apply
print("ποΈ Off-grid site β TOU schedule skipped")
Dashboard / Status Display¶
# Colour-coded terminal output (ANSI)
_COLORS = {
GridConnectionState.CONNECTED: "\033[32mConnected\033[0m", # green
GridConnectionState.OUTAGE: "\033[31mOutage\033[0m", # red
GridConnectionState.SIMULATED_OFF_GRID: "\033[33mSimulated\033[0m", # yellow
GridConnectionState.NOT_GRID_TIED: "\033[36mNot Grid-Tied\033[0m", # cyan
}
state = stats.current.grid_connection_state
print(f"Grid: {_COLORS.get(state, state.value)}")
# JSON / MQTT telemetry payload β .value is always a str
payload = {
"grid_status": state.value, # "Connected" etc.
"grid_ok": state == GridConnectionState.CONNECTED, # bool shortcut
"solar_kw": stats.current.solar_production,
"battery_soc": stats.current.battery_soc,
}
FHAI / Home Assistant Integration¶
The FHAI gateway service receives the flattened Current dataclass as a dict.
dataclasses.asdict() serialises the enum to its .value string automatically:
import dataclasses
from franklinwh_cloud.models import GridConnectionState
stats = await client.get_stats()
d = dataclasses.asdict(stats.current)
# grid_connection_state is now the .value string in the dict
grid_status = d.get("grid_connection_state", "Connected")
# β "Connected" | "Outage" | "SimulatedOffGrid" | "NotGridTied"
status_payload = {
"grid_status": grid_status, # forward directly to HA sensor
# ... other fields
}
[!TIP] FHAI handoff note:
grid_connection_stateis always a string afterasdict(). No bool checks, noif grid_outagebranches. Map each value to a Home Assistantsensorstate directly β e.g. HAstate_class: measurementwithoptionslist.
Live Integration Test¶
A destructive live test verifying the complete state cycle is included:
# Requires: franklinwh.ini with real credentials, SOC β₯ reserve + 10%
pytest -m "live and destructive" tests/test_live.py::TestLiveGridConnectionState -s -v
Pre-flight checks enforced: SOC margin, current connection state, terminal yes confirmation.
Guarantees: try/finally restore, poll-loop on reconnect (30s timeout).
βοΈ Operating Mode & Run Status β Field Guide¶
[!WARNING] FranklinWH Naming Collision. The
runtimeDatapayload contains two similarly-named integer fields with completely different semantics. Previous agents got this wrong multiple times. Read this section carefully before writing any mode-display or VPP detection logic.
The Two Raw Fields β What They Actually Mean¶
The raw API response (getDeviceCompositeInfo, cmdType 203, result.runtimeData) includes:
| Raw API key | Python field | Semantics | Live examples |
|---|---|---|---|
runtimeData.run_status |
run_status (int) |
RUN_STATUS key β what the battery hardware is physically doing. Maps directly to RUN_STATUS dict. |
1=Charging, 2=Discharging |
runtimeData.mode |
tou_mode (int) |
Programme/schedule ID β an arbitrary backend ID for the active programme. NOT a RUN_STATUS key. VPP happens to use ID 9; normal TOU programmes use large IDs like 29287. |
9=VPP, 29287=Ausgrid EA11 TOU |
runtimeData.name |
tou_mode_desc (str) |
Programme label β the human-readable name for the active programme. Often empty during VPP β always fall back to RUN_STATUS[9] in that case. |
"Ausgrid EA11 TOU", "" |
[!CAUTION] The key mistake to avoid:
runtimeData.modeis not aRUN_STATUSkey. DoingRUN_STATUS[runtimeData.mode]will return"Unknown"for any normal TOU programme (e.g.29287). OnlyruntimeData.run_statusmaps toRUN_STATUS. FranklinWH's naming is genuinely misleading.
RUN_STATUS Value Table (keyed by runtimeData.run_status, not runtimeData.mode)¶
from franklinwh_cloud.const import RUN_STATUS
# RUN_STATUS = {
# 0: "Standby", # Inactive / idle
# 1: "Charging",
# 2: "Discharging",
# 3: "Unknown 3", # Reserved
# 4: "Unknown 4", # Reserved
# 5: "Off-Grid Standby", # Island mode β battery idle, no grid
# 6: "Off-Grid Charging", # Island mode β solar charging battery
# 7: "Off-Grid Discharging", # Island mode β battery powering home
# 8: "Debug Mode", # Franklin Remote Support session active
# 9: "VPP mode", # Virtual Power Plant β utility/aggregator controlled
# }
The Three Derived Fields on stats.current¶
| Python field | Derived from | What it shows |
|---|---|---|
run_status_desc |
RUN_STATUS[runtimeData.run_status] |
What the battery is physically doing β "Charging", "Discharging", "Standby" |
tou_mode_desc |
runtimeData.name (raw) |
Active programme label from the cloud β "Ausgrid EA11 TOU", "" during VPP |
effective_mode |
derived (priority order below) | App-matching dominant mode label β the single best label to show users |
effective_mode priority order:
1. tou_mode_desc if non-empty β e.g. "Ausgrid EA11 TOU", covers named programmes
2. RUN_STATUS[9] = "VPP mode" if tou_mode == 9 and name is empty β VPP fallback
3. work_mode_desc β "Time-Of-Use", "Self-Consumption", "Emergency Backup"
VPP Mode (Virtual Power Plant)¶
VPP mode means your gateway is under utility/aggregator control. The gateway dispatches autonomously to external signals β your personal TOU schedule is suspended.
Detection:
stats = await client.get_stats()
cur = stats.current
# Recommended: use tou_mode (programme ID 9 = VPP)
is_vpp = cur.tou_mode == 9
# Or check the pre-derived effective_mode label
is_vpp = cur.effective_mode == "VPP mode"
What the FranklinWH mobile app shows during VPP:
β Discharging β runtimeData.run_status β RUN_STATUS[2] β hardware action
β VPP Mode β runtimeData.name (or RUN_STATUS[9] fallback when name is empty)
The bullet-dot (β) line = what the battery is doing. The gear (β) line = what controls it.
Recommended Display Recipe¶
stats = await client.get_stats()
cur = stats.current
# Single dominant label β matches the app's top-card display
print(f"Mode: {cur.effective_mode}")
# β "Ausgrid EA11 TOU" | "VPP mode" | "Time-Of-Use" | "Self-Consumption"
# Hardware action β what the battery is physically doing right now
print(f"Action: {cur.run_status_desc}")
# β "Charging" | "Discharging" | "Standby"
Exactly reproducing the app's two-line display:
from franklinwh_cloud.const import RUN_STATUS
cur = stats.current
# Line 1 (β bullet) β hardware physical action
print(f"β {cur.run_status_desc}") # RUN_STATUS[runtimeData.run_status]
# Line 2 (β gear) β controlling programme
print(f"β {cur.effective_mode}") # tou_mode_desc | "VPP mode" | work_mode_desc
Home Assistant / MQTT Payload¶
stats = await client.get_stats()
cur = stats.current
payload = {
# Primary display sensor β matches the app's dominant label
"effective_mode": cur.effective_mode,
# β "Ausgrid EA11 TOU" | "VPP mode" | "Time-Of-Use" | "Self-Consumption"
# Hardware action sensor β what the battery is physically doing
"hardware_action": cur.run_status_desc,
# β "Charging" | "Discharging" | "Standby"
# Base operating mode (unchanged by VPP)
"work_mode": cur.work_mode_desc, # "Time-Of-Use" | "Self-Consumption" | "Emergency Backup"
"work_mode_int": cur.work_mode, # 1=TOU, 2=Self, 3=EmgBkp
# Raw programme ID β useful for automation triggers
"programme_id": cur.tou_mode, # runtimeData.mode β arbitrary int (9=VPP, 29287=Ausgrid etc.)
"programme_label": cur.tou_mode_desc, # runtimeData.name β may be empty
# Hardware state int β for history graphs
"run_status_int": cur.run_status, # runtimeData.run_status β RUN_STATUS key
# Convenience flag
"vpp_active": cur.tou_mode == 9, # bool
}
[!TIP] When
tou_mode_descis empty (common during VPP): the library automatically falls back toRUN_STATUS[9]="VPP mode"ineffective_mode. You never need to null-checktou_mode_descyourself β just readeffective_mode.
ποΈ Roles & Responsibilities: Integrator vs Library¶
When integrating this library into an end-user application (like Home Assistant), you must maintain a strict conceptual boundary between what the library does and what your application is responsible for.
The Facade Pattern¶
The franklinwh-cloud library is a rigid facade. Its only job is to abstract away the undocumented, unstable, and volatile FranklinWH cloud endpoints so you never have to care if they rename an internal variable tomorrow. The library guarantees that stats.current.grid_relay will always be available, regardless of how many endpoints it had to secretly query to construct that value.
Accessory Impact & Upstream Limitations¶
The FranklinWH Cloud API is heavily un-optimized for systems with optional accessories. If an aGate has a Generator mapped or V2L enabled, the cloud backend physically requires the library to query secondary, non-cached endpoints (like cmdType 211) to assemble a complete telemetry snapshot.
This doubles or triples the API footprint per refresh tick. This is an upstream cloud limitation, not a library deficiency.
As the Integrator/App Developer, it is your responsibility to inform your users of the performance hit. Your application should proactively warn users: "Because you have a Generator installed your telemetry requires multiple API cycles; you may experience higher latency or aggressive rate limiting if poll rates are set too aggressively." Do not attempt to force the library to mask upstream latency.
Infinite Session Persistence (Transparent Auto-Renewal)¶
Users of the official FranklinWH Mobile App often experience "Idle Timeouts" or "Session Expired β Please log back in" prompts. The official app pushes the burden of session management onto the user.
By design, the franklinwh-cloud library entirely subverts this. The library embeds an instrumented_retry loop at the core HTTP boundaries. If the cloud servers invalidate the JWT token (HTTP 401 or Code 10009), the library will silently catch the rejection, automatically negotiate a new token via the TokenFetcher, and replay the original API request identically.
Downstream clients (like Home Assistant) therefore benefit from infinite session persistence and will never receive an expired session exception unless the underlying master credentials have been permanently revoked.
If your integration requires auditing these transparent rotations natively (e.g., to draw a "Session Uptime" metric on a dashboard), you can easily poll the built-in tracking metric:
# Returns elapsed seconds since the last silent JWT refresh,
# or None if the original token is still valid.
s_ago = client.get_metrics().get("last_token_refresh_s_ago")
if s_ago is not None:
print(f"Library transparently negotiated a fresh token {s_ago} seconds ago.")
Connection Preamble¶
All recipes start with establishing an authenticated session and binding a physical aGate serial number.
Modern Transparent Auth (The Future)¶
The modern Client boundary provides absolute control over the emulation footprint (e.g., passing a specific emulate_app_version API header) and decouples the authentication lifecycle from the command executor. This is the recommended approach for all new integrations.
import asyncio
from franklinwh_cloud.auth import PasswordAuth
from franklinwh_cloud.client import Client
async def main():
# 1. Fetch token and dictate the exact mobile emulation string
auth = PasswordAuth("user@example.com", "secret", emulate_app_version="APP2.11.0")
await auth.get_token()
# 2. Bind the active session to a specific physical aGate
# (Required for multi-aGate environments!)
gateway_serial = "10060006AXXXXXXXXX"
client = Client(auth, gateway=gateway_serial, emulate_app_version="APP2.11.0")
# ... your recipe code here ...
stats = await client.get_stats()
print(f"Battery SoC: {stats.current.battery_pct}%")
asyncio.run(main())
Legacy Wrapper (Single aGate Happy Path)¶
If you have an older script or only manage a single aGate on your account, the legacy FranklinWHCloud orchestrator will automatically guess your credentials and auto-discover the serial number for you.
import asyncio
from franklinwh_cloud import FranklinWHCloud
async def main():
# Will automatically fetch CLI or .ini credentials if omitted
client = FranklinWHCloud(email="user@example.com", password="secret")
await client.login()
await client.select_gateway() # Natively fetches and binds the first gateway it finds
# ... your recipe code here ...
stats = await client.get_stats()
print(f"Battery SoC: {stats.current.battery_pct}%")
asyncio.run(main())
Multi-aGate Discovery (Account-Level APIs)¶
If you manage multiple gateways on a single account and don't know their serial numbers natively, you MUST iteratively discover them before pushing commands. By substituting a temporary proxy client, you can securely execute the get_home_gateway_list() account API before touching hardware.
import asyncio
from franklinwh_cloud.auth import PasswordAuth
from franklinwh_cloud.client import Client
async def main():
auth = PasswordAuth("user@example.com", "secret")
# 1. Instantiate a proxy client to unlock Account-level APIs
proxy = Client(auth, "placeholder")
gateways_raw = await proxy.get_home_gateway_list()
# 2. Iterate and bind explicitly
for gw in gateways_raw.get("result", []):
serial = gw.get("id")
print(f"\\n--- Binding to aGate {serial} ---")
# 3. Create a dedicated client purely for this physical aGate
agate_client = Client(auth, gateway=serial)
# Now hardware calls are safely routed to this target
stats = await agate_client.get_stats()
print(f"[{serial}] Battery SoC: {stats.current.battery_pct}%")
asyncio.run(main())
Gateway Groups β get_home_gateway_list() Group Fields¶
get_home_gateway_list() is the only API that returns group membership. The
get_site_and_device_info() endpoint does NOT include group data β it only flat-lists
gateways under a site.
Raw response fields (per gateway entry):
| Field | Type | Meaning |
|---|---|---|
groupId |
str \| null |
UUID/int of the group. null = ungrouped |
groupName |
str \| null |
Human label ("Main House"). null = ungrouped |
groupFlag |
int |
1 = this gateway belongs to a group; 0 = ungrouped |
[!IMPORTANT]
groupFlag=0is the authoritative "ungrouped" sentinel β do not infer grouping fromgroupId != nullalone. Always checkgroupFlag == 1first. Single-gateway accounts will havegroupId: null, groupName: null, groupFlag: 0.
Pattern β building a group-aware account topology:
gateways_raw = await proxy.get_home_gateway_list()
gateways = gateways_raw.get("result", [])
# Only show group tier when at least one gateway is grouped
has_groups = any(gw.get("groupFlag") == 1 for gw in gateways)
# Build group buckets
group_buckets = {} # groupId (or None) β [gateway_dict, ...]
group_names = {} # groupId β groupName
for gw in gateways:
grp = gw.get("groupId") if gw.get("groupFlag") == 1 else None
group_buckets.setdefault(grp, []).append(gw)
if grp and grp not in group_names:
group_names[grp] = gw.get("groupName") or f"Group {grp}"
if has_groups:
for grp_id, members in group_buckets.items():
label = group_names.get(grp_id, "(ungrouped)")
print(f"Group: {label} (GroupId: {grp_id})")
for gw in members:
print(f" βββ {gw['name']} ({gw['id']})")
else:
# Single-gateway or all ungrouped β no group header needed
for gw in gateways:
print(f"βββ {gw['name']} ({gw['id']})")
Result for a two-group account:
Group: "Main House" (GroupId: 501)
βββ FHP1 (10060006AXXXXXXXXX)
βββ FHP2 (10060006AXXXXXXXXX)
Group: (ungrouped)
βββ FHP3 (10060006AXXXXXXXXX)
[!NOTE] Groups are a display/organisation concept only β they have no effect on API routing or command scoping. Each gateway always requires its own
Client(auth, gateway=serial)instance regardless of group membership.
Custom Client Identity (HTTP Headers)¶
By default, the library sends a generic franklinwh-cloud-client User-Agent. If you are building an integration (e.g., a Home Assistant add-on or custom dashboard), you can declare your identity to FranklinWH's servers:
custom_headers = {
"User-Agent": "HomeAssistant-Addon/1.0.0",
"X-Client-Version": "1.0.0"
}
client = FranklinWHCloud(
email="user@example.com",
password="secret",
client_headers=custom_headers
)
Or load from config file:
Quick Reference¶
One-liner recipes. All assume client is connected (see preamble above).
Power Flow & Status¶
stats = await client.get_stats()
# Instantaneous power (kW)
solar_kw = stats.current.solar_production # p_sun
battery_kw = stats.current.battery_use # p_fhp (negative = charging)
grid_kw = stats.current.grid_use # p_uti
home_kw = stats.current.home_load # p_load
soc = stats.current.battery_soc # Battery %
# Operating state
mode_name = stats.current.work_mode_desc # "Self Consumption"
run_status = stats.current.run_status_desc # "Normal operation"
# Grid connection state (four-state enum β see GridConnectionState section below)
from franklinwh_cloud.models import GridConnectionState
grid_state = stats.current.grid_connection_state # GridConnectionState.CONNECTED
grid_label = grid_state.value # "Connected" / "Outage" / ...
grid_ok = grid_state == GridConnectionState.CONNECTED
# Daily totals (kWh)
solar_kwh = stats.totals.solar # kwh_sun
grid_in_kwh = stats.totals.grid_import # kwh_uti_in
grid_out_kwh = stats.totals.grid_export # kwh_uti_out
bat_chg_kwh = stats.totals.battery_charge # kwh_fhp_chg
bat_dis_kwh = stats.totals.battery_discharge # kwh_fhp_di
home_kwh = stats.totals.home_use # kwh_load
Operating Mode & System Limiters (Global SOC Configs)¶
The getGatewayTouListV2 endpoint serves not just TOU schedules, but acts as a global Operating Mode Configuration blob. Recent updates have exposed several programmatic SOC (State of Charge) limiters:
soc: The global backup reserve buffer (Emergency Reserve).maxSoc: The maximum allowed charging limit (e.g., protecting degraded cells).complianceSoc: The regulatory/utility-mandated minimum reserve.dischargeDepthSoc: The lowest permissible physical discharge limit (often 5% or 10%).
from franklinwh_cloud.const import (
TIME_OF_USE, SELF_CONSUMPTION, EMERGENCY_BACKUP, # 1, 2, 3
)
# 1. View Global Configurations (Including SOC parameters)
config = await client.get_tou_dispatch_detail()
print(f"Current Reserve SOC: {config.get('soc')}%")
print(f"Maximum Charge Limit (maxSoc): {config.get('maxSoc', 'Unrestricted')}%")
print(f"Compliance Reserve (complianceSoc): {config.get('complianceSoc', 'None')}%")
print(f"Discharge Depth Threshold: {config.get('dischargeDepthSoc', 'None')}%")
# 2. Get current mode specifically
mode = await client.get_mode()
print(f"Mode: {mode['modeName']}, Run: {mode['run_desc']}")
# 3. Get reserve SoC for all modes
soc_all = await client.get_all_mode_soc()
# Returns: [{'workMode': 1, 'name': 'Time of Use', 'soc': 7.0, ...}, ...]
# 4. Switch to Self-Consumption, keep current SoC
await client.set_mode(SELF_CONSUMPTION, None, None, None, None)
# workMode=2 soc forever nextMode duration
# 5. Switch to Emergency Backup β indefinite
await client.set_mode(EMERGENCY_BACKUP, None, 1, SELF_CONSUMPTION, None)
# workMode=3 soc forever=1 nextMode=2 duration
# 6. Switch to Emergency Backup β 2 hours, then revert to Self-Consumption
await client.set_mode(EMERGENCY_BACKUP, None, 2, SELF_CONSUMPTION, 120)
# workMode=3 soc timed=2 nextMode=2 mins
# 7. Update backup reserve SoC to 20% for Self-Consumption mode
await client.update_soc(requestedSOC=20, workMode=SELF_CONSUMPTION)
# workMode=2
TOU Scheduling¶
[!CAUTION] Every
set_tou_schedule/set_tou_schedule_multicall is DESTRUCTIVE.saveTouDispatchvalidates, saves, AND switches the gateway to TOU mode in one atomic call. There is no "save without activating" path. Always back up the current schedule withget_tou_dispatch_detail()before dispatching so you can restore it on completion.
from franklinwh_cloud.const import (
dispatchCodeType, # SELF=6, GRID_CHARGE=8, GRID_EXPORT=7, STANDBY=2 ...
WaveType, # OFF_PEAK=0, MID_PEAK=1, ON_PEAK=2, SUPER_OFF_PEAK=4
)
# ββ View current schedule ββββββββββββββββββββββββββββββββββββββββββββββββββ
schedule = await client.get_tou_dispatch_detail()
# Returns raw API envelope: {"code": 200, "result": {"template": {...}, "strategyList": [...]}}
strategy_list = schedule.get("result", {}).get("strategyList", [])
# ββ Set full-day self-consumption ββββββββββββββββββββββββββββββββββββββββββ
await client.set_tou_schedule(touMode="SELF")
Example 1 β Single Season: Grid Charge for 1 Hour¶
Use this pattern when you want to force a charge window for the current active
season. The library resolves today's date to the correct season automatically
and leaves all other seasons untouched. The rest of the 24-hour day is filled
with default_mode automatically.
# ββ Set 1-hour grid charge window for today's active season βββββββββββββββ
await client.set_tou_schedule(
touMode="CUSTOM",
touSchedule=[{
"name": "Grid Charge",
"startHourTime": "01:00",
"endHourTime": "02:00",
"dispatchId": dispatchCodeType.GRID_CHARGE.value, # 8 β charge from solar/grid
"waveType": WaveType.OFF_PEAK.value, # 0 β off-peak rate tier
"gridChargeMax": 5000, # Watts β cap at 5 kW
}],
default_mode="SELF", # Rest of day = self-consumption (dispatchId=6)
day_type=3, # 3 = everyDay (weekday + weekend)
# month=4, # Optional: target April's season explicitly
)
# ββ Caller is responsible for backup/restore if needed ββββββββββββββββββββ
# Backup: original = (await client.get_tou_dispatch_detail())["result"]["strategyList"]
# Restore: await client.set_tou_schedule_multi(original)
Dispatch codes confirmed from mobile app HAR captures:
dispatchId |
Code | Meaning | Use when |
|---|---|---|---|
1 |
F |
aPower to home | Battery powers home; solar exports |
2 |
B |
aPower on standby | Battery idles; solar/grid handles home |
3 |
E |
aPower charges from solar | Solar charges battery; grid covers home |
6 |
D |
Self-consumption | Normal: solar β battery β grid priority |
7 |
H |
aPower to home/grid | Battery discharges to home + exports excess |
8 |
G |
aPower charges from solar/grid | Force charge from both solar and grid |
[!WARNING] FranklinWH has changed
dispatchIdnumbering in past API updates. Always verify against a livefranklinwh-cli tou --dispatchoutput before hardcoding IDs. TheCodeletter (B/D/E/F/G/H) is a secondary cross-reference fromgetCustomEnergyDispatchList.
Example 2 β Multi-Season: Weekday/Weekend Split¶
Use set_tou_schedule_multi() when you need to write a full multi-season
TOU schedule with different weekday and weekend profiles per season.
[!NOTE] This is also the correct restore path after a dispatch. Save the full
strategyListbefore dispatching and callset_tou_schedule_multito restore it exactly β preserving season names, month ranges, and day-type structures.
# ββ Build a two-season weekday/weekend schedule βββββββββββββββββββββββββββ
summer_weekday_blocks = [
{"name": "Off-peak", "startHourTime": "00:00", "endHourTime": "07:00",
"dispatchId": 6, "waveType": 0}, # Self-consumption overnight
{"name": "Charge", "startHourTime": "07:00", "endHourTime": "10:00",
"dispatchId": 8, "waveType": 0}, # Charge from cheap morning solar
{"name": "On-peak", "startHourTime": "17:00", "endHourTime": "21:00",
"dispatchId": 7, "waveType": 2}, # Discharge to grid during peak
{"name": "Off-peak", "startHourTime": "21:00", "endHourTime": "24:00",
"dispatchId": 6, "waveType": 0}, # Self-consumption late night
]
summer_weekend_blocks = [
{"name": "Off-peak", "startHourTime": "00:00", "endHourTime": "10:00",
"dispatchId": 6, "waveType": 0}, # Sleep in β self-consumption
{"name": "Solar", "startHourTime": "10:00", "endHourTime": "17:00",
"dispatchId": 3, "waveType": 0}, # Charge from daytime solar
{"name": "On-peak", "startHourTime": "17:00", "endHourTime": "21:00",
"dispatchId": 7, "waveType": 2}, # Discharge to grid during peak
{"name": "Off-peak", "startHourTime": "21:00", "endHourTime": "24:00",
"dispatchId": 6, "waveType": 0},
]
winter_blocks = [
{"name": "Off-peak", "startHourTime": "00:00", "endHourTime": "06:00",
"dispatchId": 8, "waveType": 0}, # Cheap overnight grid charge
{"name": "Self", "startHourTime": "06:00", "endHourTime": "16:00",
"dispatchId": 6, "waveType": 0}, # Self-consumption during day
{"name": "Peak", "startHourTime": "16:00", "endHourTime": "21:00",
"dispatchId": 7, "waveType": 2}, # Peak export
{"name": "Off-peak", "startHourTime": "21:00", "endHourTime": "24:00",
"dispatchId": 6, "waveType": 0},
]
strategy_list = [
{
"id": None,
"seasonName": "Summer",
"month": "10,11,12,1,2,3", # OctβMar
"templateId": None,
"dayTypeVoList": [
{
"dayName": "weekDay", "dayType": 1, # 1 = MonβFri
"detailVoList": summer_weekday_blocks,
"eleticRateValley": 0.08, "eleticSellValley": 0.05,
"eleticRatePeak": None, "eleticRateSharp": None,
"eleticRateShoulder": None, "eleticRateSuperOffPeak": None,
"eleticSellPeak": None, "eleticSellSharp": None,
"eleticSellShoulder": None, "eleticSellSuperOffPeak": None,
"eleticRateGridFee": None,
},
{
"dayName": "weekendDay", "dayType": 2, # 2 = SatβSun
"detailVoList": summer_weekend_blocks,
"eleticRateValley": 0.08, "eleticSellValley": 0.05,
"eleticRatePeak": None, "eleticRateSharp": None,
"eleticRateShoulder": None, "eleticRateSuperOffPeak": None,
"eleticSellPeak": None, "eleticSellSharp": None,
"eleticSellShoulder": None, "eleticSellSuperOffPeak": None,
"eleticRateGridFee": None,
},
],
},
{
"id": None,
"seasonName": "Winter",
"month": "4,5,6,7,8,9", # AprβSep
"templateId": None,
"dayTypeVoList": [
{
"dayName": "everyDay", "dayType": 3, # 3 = all days
"detailVoList": winter_blocks,
"eleticRateValley": 0.06, "eleticSellValley": 0.03,
"eleticRatePeak": None, "eleticRateSharp": None,
"eleticRateShoulder": None, "eleticRateSuperOffPeak": None,
"eleticSellPeak": None, "eleticSellSharp": None,
"eleticSellShoulder": None, "eleticSellSuperOffPeak": None,
"eleticRateGridFee": None,
},
],
},
]
await client.set_tou_schedule_multi(strategy_list)
[!TIP] Reading back what you wrote: immediately after
set_tou_schedule_multi, callfranklinwh-cli touorget_tou_dispatch_detail()to verify the gateway received the schedule. The gateway cloud sync takes ~30β60 seconds.Dispatch code reference β see Dispatch Code Reference below.
Power Control (PCS)¶
from franklinwh_cloud.models import GridStatus, GridConnectionState
# Get current grid import/export limits
pcs = await client.get_power_control_settings()
# Set grid export max to 5 kW, import unlimited
await client.set_power_control_settings(
globalGridDischargeMax=5.0, # export limit kW (-1=unlimited, 0=disabled)
globalGridChargeMax=-1, # import limit kW (-1=unlimited, 0=disabled)
)
# Go off-grid (simulate outage β opens grid contactor)
# NOTE: this changes grid_connection_state β SIMULATED_OFF_GRID
await client.set_grid_status(GridStatus.OFF, soc=5)
# GridStatus.OFF=2 minimum SoC before auto-restore
# Restore grid connection
await client.set_grid_status(GridStatus.NORMAL)
# GridStatus.NORMAL=0
Devices & BMS¶
# Get aGate info (firmware, serial)
agate = await client.get_agate_info()
# Get aPower battery info (capacity, serial)
apower = await client.get_apower_info()
# Get BMS cell data for a specific battery
bms = await client.get_bms_info("APOWER_SERIAL_NUMBER")
# Get smart circuit states
circuits = await client.get_smart_circuits_info()
# Toggle smart switch 1 ON, switch 2 OFF, switch 3 unchanged
await client.set_smart_switch_state((True, False, None))
# Get relay states + grid voltage/current/frequency
power_info = await client.get_power_info()
# Device discovery β structured snapshot of entire system
snapshot = await client.discover(tier=2) # tier 1=basic, 2=verbose, 3=pedantic
print(f"aGate: {snapshot.agate.model}, aPowers: {snapshot.batteries.count}")
LED Strip¶
# Get current LED settings
led = await client.led_light_settings(mode="1", dataArea={})
# Turn LED on with colour and brightness (aPower 2/S)
await client.led_light_settings(mode="2", dataArea={
"lightStat": 2, # 1=Off, 2=On
"rgb": "FF6600", # Hex colour
"bright": 80, # Brightness 0-100
"timeEn": 1, # 0=No schedule, 1=Schedule enabled
"lightOpenTime": "06:00",
"lightCloseTime": "22:00",
})
Smart Assistant (AI)¶
# Get example queries
examples = await client.smart_assistant(requestType="1") # 1=list examples
# Ask a question
answer = await client.smart_assistant(requestType="2", query="What is my battery level?")
# requestType="2" # 2=ask question
print(answer)
β οΈ Some AI commands may only execute on the mobile app.
Monitoring Network Connectivity¶
Instead of relying solely on tracking network_connection via get_stats(), you can pull a definitive snapshot of the active connections and their IPs using get_connectivity_overview().
[!TIP] Best Practice for API Clients / UI Dashboards: To minimize polling overhead on the hardware, call the default (essential) view periodically (e.g. every 5 minutes / on startup). Only pass
deep_scan=Trueif you explicitly need to re-verify the SPAN integration or ping the local Modbus TCP502port.
# 1. Essential View (Fast, lightweight polling for UIs)
# Fetches active/backup links, AWS cloud connection, and router status.
net = await client.get_connectivity_overview()
primary = net["primary"]
print(f"Cloud Connected: {net['cloud_connected']}")
print(f"Primary Link: {primary['name']} (ID: {primary['id']})")
print(f"Gateway & IP: IP: {primary['ip']}, Gateway: {primary['gateway']}")
for backup in net["backups"]:
print(f"Backup Link: {backup['name']} (ID: {backup['id']})")
# 2. Deep Diagnostic View (Slower, use only when necessary)
# Pings Modbus 502 on the local IP and checks external SPAN flags.
deep_net = await client.get_connectivity_overview(deep_scan=True)
if deep_net["modbus_tcp_502_open"]:
print("Modbus polling is available locally!")
Historical Energy Data¶
from datetime import date
# Today's energy breakdown
day = await client.get_power_details(type=1, timeperiod="2026-03-18")
# type=1 # DAY β hourly breakdown
# This week
week = await client.get_power_details(type=2, timeperiod="2026-03-18")
# type=2 # WEEK β daily breakdown
# This month
month = await client.get_power_details(type=3, timeperiod="2026-03-01")
# type=3 # MONTH β daily breakdown
# This year
year = await client.get_power_details(type=4, timeperiod="2026-01-01")
# type=4 # YEAR β monthly breakdown
# Lifetime totals
lifetime = await client.get_power_details(type=5, timeperiod=str(date.today()))
# type=5 # LIFETIME β all-time
Weather & Storm Hedge¶
weather = await client.get_weather()
storms = await client.get_storm_settings()
# Enable Storm Hedge, 60 min advance backup
await client.set_storm_settings(
stormEn=1, # 0=Disabled, 1=Enabled
setAdvanceBackupTime=60, # Minutes before storm to switch to backup
)
Account & Notifications¶
# List all gateways on account
gateways = await client.get_home_gateway_list()
# Get unread notification count
unread = await client.get_unread_count()
# Get recent notifications
notes = await client.get_notifications(pageNum=1, pageSize=20)
# Get warranty info
warranty = await client.get_warranty_info()
# Site info (user ID, roles, distributor)
site = await client.siteinfo()
Diagnostics & Metrics¶
The franklinwh-cloud client tracks detailed telemetry about every API call, retry, and HTTP connection it makes. You can pull this snapshot at any time to build API health dashboards.
# Get a realtime snapshot of API client health
metrics = client.get_metrics()
# The snapshot contains detailed routing and latency info:
print(f"Total API Calls: {metrics['uptime']['total_requests']}")
print(f"CloudFront Edge: {metrics['edge']['last_pop']}")
print(f"Average Latency: {metrics['timing']['avg_ms']:.0f} ms")
# Endpoint specific hit-counts
for ep, hits in metrics['endpoints'].items():
print(f" {ep}: {hits} calls")
# Error tracking and token refreshes
print(f"Parse Errors: {metrics['errors']['parse']}")
print(f"Auth Refreshes: {metrics['uptime']['token_refreshes']}")
Full Example: System Dashboard with SoC Estimation¶
A complete script that displays power flow, operating state, and estimates time to full charge or reserve SoC.
import asyncio
from franklinwh_cloud import FranklinWHCloud
RESERVE_SOC = 20 # Your backup reserve %
async def main():
client = FranklinWHCloud.from_config("franklinwh.ini")
await client.login()
await client.select_gateway()
# Get dynamic battery capacity directly from the API
device_info = await client.get_device_info()
battery_kwh = device_info.get("result", {}).get("totalCap", 13.6)
stats = await client.get_stats()
c = stats.current
t = stats.totals
# ββ Power Flow ββ
print("β‘ Power Flow (kW)")
print(f" Solar: {c.solar_production:6.2f} kW")
print(f" Battery: {c.battery_power:6.2f} kW {'β¬ charging' if c.battery_power < 0 else 'β¬ discharging'}")
print(f" Grid: {c.grid_power:6.2f} kW")
print(f" Home: {c.home_consumption:6.2f} kW")
print()
# ββ Status ββ
print(f"π Battery: {c.battery_soc:.0f}%")
print(f"π Mode: {c.work_mode_desc}")
print(f"π Status: {c.run_status_desc}")
print(f"π Grid: {c.grid_connection_state.value}")
print()
# ββ SoC Time Estimation ββ
bat_kw = abs(c.battery_power)
if bat_kw > 0.05: # ignore noise below 50W
current_kwh = (c.soc / 100) * battery_kwh
if c.battery_power < 0: # Charging
remaining_kwh = battery_kwh - current_kwh
hours = remaining_kwh / bat_kw
h, m = int(hours), int((hours % 1) * 60)
print(f"β±οΈ Estimated ~{h}h {m}m to 100% (at {bat_kw:.1f} kW)")
else: # Discharging
usable_kwh = current_kwh - (RESERVE_SOC / 100) * battery_kwh
if usable_kwh > 0:
hours = usable_kwh / bat_kw
h, m = int(hours), int((hours % 1) * 60)
print(f"β±οΈ Estimated ~{h}h {m}m to reserve ({RESERVE_SOC}%) (at {bat_kw:.1f} kW)")
else:
print(f"β οΈ Battery at or below reserve ({RESERVE_SOC}%)")
else:
print("βΈοΈ Battery idle")
print()
# ββ Daily Totals ββ
print("π Today (kWh)")
print(f" Solar: {t.solar:6.1f}")
print(f" Grid in: {t.grid_import:6.1f}")
print(f" Grid out: {t.grid_export:6.1f}")
print(f" Bat charge: {t.battery_charge:6.1f}")
print(f" Bat disc: {t.battery_discharge:6.1f}")
print(f" Home: {t.home:6.1f}")
asyncio.run(main())
Expected output:
β‘ Power Flow (kW)
Solar: 4.20 kW
Battery: -2.10 kW β¬ charging
Grid: 0.00 kW
Home: 2.10 kW
π Battery: 72%
π Mode: Self Consumption
π Status: Normal operation
π Grid: Connected
β±οΈ Estimated ~1h 49m to 100% (at 2.1 kW)
π Today (kWh)
Solar: 18.3
Grid in: 2.1
Grid out: 0.0
Bat charge: 8.4
Bat disc: 5.2
Home: 12.0
Full Example: Force Charge via Custom TOU¶
A production-ready script that demonstrates the complete lifecycle: save state β configure PCS β set schedule β monitor β restore.
Phase 1: PCS Preamble β Check Limits & Battery Capacity¶
Before dispatching, ensure the PCS (Power Control System) allows grid charging/discharging at the desired power levels, and check battery capacity to calculate target SoC.
import asyncio
from franklinwh_cloud import FranklinWHCloud
from franklinwh_cloud.const import (
TIME_OF_USE, SELF_CONSUMPTION, EMERGENCY_BACKUP,
dispatchCodeType, WaveType,
)
from franklinwh_cloud.models import GridStatus
# ββ Configuration ββ
CHARGE_START = "11:30"
CHARGE_END = "15:00"
TARGET_SOC = 95.0 # Stop monitoring when SoC reaches this %
POLL_INTERVAL = 60 # Seconds between monitoring polls
DISPATCH = dispatchCodeType.GRID_CHARGE # 8 = charge from solar+grid
WAVE_TYPE = WaveType.OFF_PEAK # 0 = off-peak pricing tier
async def main():
client = FranklinWHCloud.from_config("franklinwh.ini")
await client.login()
await client.select_gateway()
# ββ 1a. Save original state (for restore later) ββ
original_mode = await client.get_mode()
original_mode_id = original_mode.get("workMode", SELF_CONSUMPTION)
original_mode_name = original_mode.get("modeName", "?")
original_schedule = await client.get_tou_dispatch_detail()
print(f"π Saved state β Mode: {original_mode_name} (workMode={original_mode_id})")
# ββ 1b. Check battery capacity & inverter limits via API ββ
device_info = await client.get_device_info()
result_data = device_info.get("result", {})
# We use hardcoded fallback values (e.g. 13.6 kWh, 5.0 kW) purely as a safety net
# to prevent mathematical division-by-zero crashes in the rare event of an API failure.
# The Cloud API should realistically never return these as null.
battery_count = len(result_data.get("apowerList", [])) or 1
total_capacity_kwh = result_data.get("totalCap", 13.6)
nameplate_power_kw = result_data.get("totalPower", 5.0 * battery_count)
print(f"π Batteries: {battery_count} = {total_capacity_kwh:.1f} kWh")
print(f"β‘ Nameplate Inverter Max Power: {nameplate_power_kw:.1f} kW continuous")
stats = await client.get_stats()
current_soc = stats.current.soc
print(f"π Current SoC: {current_soc:.0f}% β Target: {TARGET_SOC:.0f}%")
if current_soc >= TARGET_SOC:
print("β
Already at target SoC β nothing to do.")
return
# ββ 1c. Check and set PCS limits ββ
pcs = await client.get_power_control_settings()
charge_limit = pcs.get("result", {}).get("globalGridChargeMax", -1)
discharge_limit = pcs.get("result", {}).get("globalGridDischargeMax", -1)
print(f"β‘ PCS limits β Grid charge: {charge_limit} kW, Grid discharge: {discharge_limit} kW")
print(f" (-1 = unlimited, 0 = disabled)")
if charge_limit == 0:
print("β οΈ Grid charging is DISABLED (0 kW). Enabling unlimited...")
await client.set_power_control_settings(
globalGridChargeMax=-1, # -1 = unlimited
globalGridDischargeMax=discharge_limit, # keep existing
)
print("β Grid charging enabled")
# ββ 1d. Ensure we are in TOU mode ββ
if original_mode_id != TIME_OF_USE:
print(f"π Switching to TOU mode (was {original_mode_name})...")
await client.set_mode(TIME_OF_USE, None, None, None, None)
await asyncio.sleep(3)
print("β Now in TOU mode")
Phase 2: Submit Schedule with Error Handling¶
Handle common errors: invalid time format, bad dispatch codes, and API failures.
# ββ 2. Set charge schedule with error handling ββ
print(f"\nβ±οΈ Setting grid charge window {CHARGE_START}β{CHARGE_END}...")
try:
result = await client.set_tou_schedule(
touMode="CUSTOM",
touSchedule=[{
"startTime": CHARGE_START,
"endTime": CHARGE_END,
"dispatchId": DISPATCH.value, # dispatchCodeType.GRID_CHARGE = 8
"waveType": WAVE_TYPE.value, # WaveType.OFF_PEAK = 0
}],
default_mode="SELF", # Outside window = self-consumption (dispatchId=6)
)
except ValueError as e:
# set_tou_schedule validates times, dispatch codes, and JSON structure
# Common errors:
# - Invalid time: "25:00" or "11:70" or missing startTime
# - Bad dispatch: dispatchId=99 (not in valid set 1,2,3,6,7,8)
# - Malformed JSON: missing required fields
print(f"β Validation error: {e}")
print(" Check: times must be HH:MM (00:00β24:00), 30-min boundaries")
print(" Check: dispatchId must be one of: 1,2,3,6,7,8")
return
except Exception as e:
# API-level errors (network, auth, server-side rejection)
print(f"β API error: {type(e).__name__}: {e}")
return
# Check API response for success
status = result.get("status")
if status != 0:
msg = result.get("msg", "Unknown error")
print(f"β Server rejected schedule: status={status}, msg={msg}")
return
tou_id = result.get("result", {}).get("id", "?")
print(f"β Schedule submitted β touId={tou_id}")
# ββ 2b. Verify schedule applied ββ
await asyncio.sleep(5) # Give aGate time to apply
detail = await client.get_tou_dispatch_detail()
blocks = detail.get("result", {}).get("detailVoList", [])
print(f"\nπ
Active schedule ({len(blocks)} blocks):")
for b in blocks:
name = b.get("dispatchName", "?")
start = b.get("startTime", "?")
end = b.get("endTime", "?")
wave = b.get("waveType", "?")
print(f" {start}β{end} {name} (waveType={wave})")
if not blocks:
print("β οΈ No dispatch blocks found β schedule may not have applied!")
Phase 3: Monitor Power Flow & SoC¶
Poll the system to confirm the dispatch is executing correctly β checking that the operating mode is still TOU, power is flowing in the expected direction, and the SoC target has been reached.
# ββ 3. Monitor loop ββ
print(f"\nπ Monitoring every {POLL_INTERVAL}s until SoC β₯ {TARGET_SOC}%...")
print(f" Press Ctrl+C to stop monitoring early.\n")
try:
while True:
stats = await client.get_stats()
c = stats.current
soc = c.soc
bat_kw = c.battery_power # negative = charging, positive = discharging
grid_kw = c.grid_power # positive = importing, negative = exporting
solar_kw = c.solar_production
mode_desc = c.work_mode_desc
grid_status = c.grid_status
# ββ 3a. Mode check β still in TOU? ββ
if "Time of Use" not in mode_desc:
print(f"β οΈ Mode changed to '{mode_desc}' β expected TOU!")
print(f" The system may have switched due to storm hedge or app override.")
break
# ββ 3b. Grid check β still connected? ββ
grid_state = stats.current.grid_connection_state
if grid_state != GridConnectionState.CONNECTED:
print(f"β οΈ Grid state: {grid_state.value} β cannot charge from grid while not connected!")
break
# ββ 3c. Power flow direction ββ
charging = bat_kw < -0.05 # Battery drawing > 50W = charging
if DISPATCH == dispatchCodeType.GRID_CHARGE:
# Grid charge: expect battery charging (bat_kw < 0) AND grid importing (grid_kw > 0)
flow_ok = charging
direction = "β¬ CHARGING" if charging else "βΈ IDLE/DISCHARGING"
grid_dir = f"grid={'importing' if grid_kw > 0 else 'exporting'} {abs(grid_kw):.2f} kW"
elif DISPATCH == dispatchCodeType.GRID_EXPORT:
# Grid export: expect battery discharging (bat_kw > 0) AND grid exporting (grid_kw < 0)
discharging = bat_kw > 0.05
flow_ok = discharging
direction = "β¬ DISCHARGING" if discharging else "βΈ IDLE/CHARGING"
grid_dir = f"grid={'exporting' if grid_kw < 0 else 'importing'} {abs(grid_kw):.2f} kW"
else:
# Other dispatches (SELF, SOLAR, HOME, STANDBY)
flow_ok = True
direction = f"bat={bat_kw:+.2f} kW"
grid_dir = f"grid={grid_kw:+.2f} kW"
status_icon = "β
" if flow_ok else "β οΈ"
print(
f" {status_icon} SoC: {soc:5.1f}% | "
f"Battery: {bat_kw:+6.2f} kW {direction} | "
f"Solar: {solar_kw:.2f} kW | {grid_dir}"
)
# ββ 3d. SoC target reached? ββ
if DISPATCH == dispatchCodeType.GRID_CHARGE and soc >= TARGET_SOC:
print(f"\nπ― Target SoC reached: {soc:.0f}% β₯ {TARGET_SOC:.0f}%")
break
if DISPATCH == dispatchCodeType.GRID_EXPORT and soc <= TARGET_SOC:
# For export, TARGET_SOC is the minimum SoC before stopping
print(f"\nπ― Minimum SoC reached: {soc:.0f}% β€ {TARGET_SOC:.0f}%")
break
await asyncio.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
print("\nβΉοΈ Monitoring stopped by user.")
Phase 4: Restore Original State¶
Always restore the original TOU schedule and operating mode β even if monitoring was interrupted.
# ββ 4. Restore original state ββ
print(f"\nπ Restoring original state...")
# 4a. Restore original TOU schedule
try:
orig_blocks = original_schedule.get("result", {}).get("detailVoList", [])
if orig_blocks:
# Re-submit the original schedule blocks
restore_schedule = []
for b in orig_blocks:
restore_schedule.append({
"startTime": b.get("startTime", "0:00"),
"endTime": b.get("endTime", "24:00"),
"dispatchId": b.get("dispatchId", 6),
"waveType": b.get("waveType", 0),
})
await client.set_tou_schedule(
touMode="CUSTOM",
touSchedule=restore_schedule,
default_mode="SELF",
)
print(f"β Restored original TOU schedule ({len(orig_blocks)} blocks)")
else:
# No blocks = was flat self-consumption
await client.set_tou_schedule(touMode="SELF")
print("β Restored to full-day self-consumption")
except Exception as e:
print(f"β οΈ Could not restore TOU schedule: {e}")
print(f" You may need to manually restore via the FranklinWH app.")
# 4b. Restore original operating mode
if original_mode_id != TIME_OF_USE:
try:
await client.set_mode(original_mode_id, None, None, None, None)
print(f"β Restored operating mode to {original_mode_name} (workMode={original_mode_id})")
except Exception as e:
print(f"β οΈ Could not restore mode: {e}")
# 4c. Final state confirmation
await asyncio.sleep(3)
final_mode = await client.get_mode()
final_stats = await client.get_stats()
print(f"\nπ Final state:")
print(f" Mode: {final_mode.get('modeName', '?')}")
print(f" SoC: {final_stats.current.soc:.0f}%")
print(f" Grid: {final_stats.current.grid_status.name}")
print("β
Done!")
asyncio.run(main())
Expected output (grid charge scenario):
π Saved state β Mode: Self Consumption (workMode=2)
π Batteries: 1 Γ aPower = ~13.6 kWh
π Current SoC: 42% β Target: 95%
β‘ PCS limits β Grid charge: -1 kW, Grid discharge: -1 kW
(-1 = unlimited, 0 = disabled)
π Switching to TOU mode (was Self Consumption)...
β Now in TOU mode
β±οΈ Setting grid charge window 11:30β15:00...
β Schedule submitted β touId=12345
π
Active schedule (3 blocks):
0:00β11:30 Self-consumption (waveType=0)
11:30β15:00 Grid charge (waveType=0)
15:00β24:00 Self-consumption (waveType=0)
π Monitoring every 60s until SoC β₯ 95%...
β
SoC: 42.3% | Battery: -4.80 kW β¬ CHARGING | Solar: 2.10 kW | grid=importing 2.70 kW
β
SoC: 48.7% | Battery: -4.90 kW β¬ CHARGING | Solar: 3.40 kW | grid=importing 1.50 kW
β
SoC: 55.1% | Battery: -5.00 kW β¬ CHARGING | Solar: 4.20 kW | grid=importing 0.80 kW
...
β
SoC: 94.8% | Battery: -1.20 kW β¬ CHARGING | Solar: 3.80 kW | grid=importing 0.00 kW
π― Target SoC reached: 95% β₯ 95%
π Restoring original state...
β Restored original TOU schedule (3 blocks)
β Restored operating mode to Self Consumption (workMode=2)
π Final state:
Mode: Self Consumption
SoC: 95%
Grid: NORMAL
β
Done!
[!CAUTION] This script modifies your live aGate TOU schedule and operating mode. Always test during off-peak hours. The script saves and restores state, but if it crashes mid-execution, restore manually via the FranklinWH app.
See TOU_SCHEDULE_GUIDE.md for dispatch codes, known limitations, and the 30-minute boundary rule.
Smart Circuits & EV Charging¶
V2 Firmware Mutations & Compatibility¶
FranklinWH recently migrated Smart Circuit (Sw) payloads from V1 integer timers (Sw1OpenTime/Sw1CloseTime2) to V2 string arrays (time_enabled, time_schedules, time_set).
* Impact: Writing schedules using the V1 integer schema will aggressively fail or be ignored by modern aGates.
* Reading: franklinwh-cloud transparently handles the parsing into SmartCircuitDetail dataclasses. You will see string representations like '2025-10-04 20:11'.
* Writing: Until the exact V2 payload constructor is fully mapped natively, schedules should only be modified manually or toggled dynamically using boolean switches (set_smart_switch_state) and Amperage limits (set_smart_circuit_load_limit).
Advanced Regional Discovery & Renaming¶
US and AU/EU markets exhibit vastly different Smart Circuit topologies. US grids support multiple aGates chained together and "merged" Smart Circuits, while AU grids standardise on 3 physical outputs per aGate with V2L. You can parse the DeviceSnapshot tree to adapt your integration intelligently:
import asyncio
from franklinwh_cloud import Client
async def diagnose_regional_smart_circuits():
client = Client("user@example.com", "secret")
await client.login()
await client.select_gateway()
# Discover Tier 2 loads all hardware Quirks and Accessories
snapshot = await client.discover(tier=2)
acc = snapshot.accessories
# Check if this gateway supports the US "merge" functionality or AU V2L
quirks = acc.get("gateway", {}).get("region_quirks", {})
if quirks.get("supports_smart_circuit_merge"):
print("π US Region Detected: Application may contain merged Smart Circuits spanning multiple aGates.")
elif quirks.get("supports_v2l"):
print("π AU Region Detected: Vehicle-to-Load output may occupy Smart Circuit 1.")
# Detect if a Generator or Smart Circuits are physically installed
installed = acc.get("installed", [])
if "Generator" in installed:
gen_status = acc.get("accessories", {}).get("generator", {}).get("status_desc")
print(f"β‘ Generator physically wired. Current State: {gen_status}")
if "Smart_Circuit" in installed:
count = acc.get("accessories", {}).get("smart_circuits", {}).get("count", 0)
print(f"π {count} Smart Circuits physically tracked by this aGate.")
# Renaming a Smart Circuit requires a raw dictionary invoke as the API is undocumented
print("π Renaming Circuit 2 to 'EV Charger'...")
circuit_payload = {"swId": 2, "name": "EV Charger"}
await client._post("/hes-gateway/terminal/updateSmartCircuitName", payload=circuit_payload)
# asyncio.run(diagnose_regional_smart_circuits())
Complex Automation: Adaptive EV Solar Charging¶
This script tracks Solar PV, SOC, and Home Load exactly as requested: it waits until native Solar generation exceeds a set threshold, confirms no grid export is currently required, checks that the battery is sufficiently high, and optionally reaches out to a Home Assistant WebSocket (like the Enphase integration) to verify EV presence before throwing the FranklinWH Smart Circuit ON.
import asyncio
import json
import websockets # pip install websockets
from franklinwh_cloud import FranklinWHCloud
# Configuration Thresholds
SOLAR_THRESHOLD_KW = 4.0 # Minimum solar generation required to charge EV
MIN_SOC_PERCENT = 80 # Minimum FranklinWH battery level required
EV_CIRCUIT_ID = 2 # The Smart Circuit physical port (1, 2, or 3)
HA_WS_URL = "ws://homeassistant.local:8123/api/websocket"
HA_TOKEN = "eyJhbGciOiJIUzI1..."
async def check_ev_presence_ha():
"""Reach out to Home Assistant to verify the EV is plugged in and needs charge."""
try:
async with websockets.connect(HA_WS_URL) as ws:
await ws.send(json.dumps({"type": "auth", "access_token": HA_TOKEN}))
await ws.recv() # Wait for auth_ok
# Request EV state from a custom HA Enphase/Tesla integration entity
req_id = 1
await ws.send(json.dumps({"id": req_id, "type": "get_states"}))
response = json.loads(await ws.recv())
for entity in response.get("result", []):
if entity["entity_id"] == "sensor.ev_charger_status":
return entity["state"] == "plugged_in"
except Exception as e:
print(f"HA WebSocket unreachable: {e}. Defaulting to True for safety.")
return True
async def adaptive_ev_charging_loop():
client = FranklinWHCloud("user@example.com", "secret")
while True:
try:
if not client.is_authenticated():
await client.login()
await client.select_gateway()
stats = await client.get_stats()
solar_kw = stats.current.solar_production
grid_kw = stats.current.grid_power
soc = stats.current.soc
# Check if Smart Circuit is already ON
circuits = await client.get_smart_circuits_info()
ev_circuit_active = circuits.get(EV_CIRCUIT_ID).is_on
# Criteria 1: Plenty of solar headroom
# Criteria 2: Battery SOC is healthy
# Criteria 3: Grid export is negative (we possess excess power not consumed by home)
solar_sufficient = solar_kw >= SOLAR_THRESHOLD_KW
battery_sufficient = soc >= MIN_SOC_PERCENT
excess_power_available = grid_kw < 0
if solar_sufficient and battery_sufficient and excess_power_available:
if not ev_circuit_active:
# Optional: Verify the EV is physically connected via Home Assistant
ev_plugged_in = await check_ev_presence_ha()
if ev_plugged_in:
print(f"Criteria Met (Solar: {solar_kw}kW, SOC: {soc}%, Grid: {grid_kw}kW). Activating EV Circuit!")
# Toggle the specific Smart Circuit ON dynamically
switches = [None, None, None]
switches[EV_CIRCUIT_ID - 1] = True
await client.set_smart_switch_state(tuple(switches))
# Hysteresis / Shutdown Logic
# If clouds roll in and we start heavily draining from the grid, cut the charger
elif grid_kw > 1.0 and ev_circuit_active:
print("Solar dropped or grid demand spiked. Deactivating EV circuit.")
switches = [None, None, None]
switches[EV_CIRCUIT_ID - 1] = False
await client.set_smart_switch_state(tuple(switches))
except Exception as e:
print(f"Polling fault: {e}")
await asyncio.sleep(60) # Poll every 60 seconds
# asyncio.run(adaptive_ev_charging_loop())
Storm Hedge¶
Real-Time Weather Event Polling¶
The FranklinWH aGate constantly polls national weather services indicating incoming storm cells. You can proactively poll these internal lists to hook into Home Assistant automations triggering shutters or pre-chilling HVAC systems.
async def poll_weather_events():
client = FranklinWHCloud("user@example.com", "secret")
await client.login()
await client.select_gateway()
# Get active storm warnings tracked by the aGate
active_storms = await client.get_progressing_storm_list()
if active_storms:
for storm in active_storms:
print(f"π¨ Storm Detected: {storm.get('title')}")
print(f" Severity: {storm.get('severity')}")
print(f" Time: {storm.get('effective')} -> {storm.get('expires')}")
else:
print("βοΈ No severe weather events tracked by FranklinWH.")
# Check if Storm Hedge is actively protecting the battery
storm_settings = await client.get_storm_settings()
if storm_settings.get("switchStatus") == 1:
print(f"Storm Hedge Enabled! Reserve SOC protected at {storm_settings.get('backUpSoc')}%")
Dispatch Code Reference¶
| Code | dispatchCodeType Enum |
Description |
|---|---|---|
| 1 | HOME / HOME_LOADS |
aPower to home (surplus solar to grid) |
| 2 | STANDBY |
aPower on standby (surplus solar to grid) |
| 3 | SOLAR / SOLAR_CHARGE |
aPower charges from solar |
| 6 | SELF / SELF_CONSUMPTION |
Self-consumption (surplus solar to grid) |
| 7 | GRID_EXPORT / GRID_DISCHARGE / FORCE_DISCHARGE |
aPower to home/grid |
| 8 | GRID_CHARGE / GRID_IMPORT / FORCE_CHARGE |
aPower charges from solar/grid |
Method Count Summary¶
| Mixin | Methods | Category |
|---|---|---|
discover.py |
1 | Device discovery (3-tier survey) |
stats.py |
4 | Power flow, runtime data |
modes.py |
4 | Operating mode control |
tou.py |
17 | TOU schedule + tariff management |
power.py |
5 | Grid status, PCS settings |
devices.py |
16 | Hardware, BMS, smart circuits, LED, generator |
storm.py |
5 | Weather, Storm Hedge |
account.py |
18 | Account, notifications, alarms, AI |
| Total | 70 |
Exception Handling & Reliability¶
FranklinWH Cloud infrastructure can occasionally experience timeouts, offline gateways, or session invalidation. Ensure your integration scripts are wrapped in the library's strictly defined exception structures.
import asyncio
from franklinwh_cloud import FranklinWHCloud
from franklinwh_cloud.exceptions import (
FranklinWHTimeoutError,
DeviceTimeoutException,
GatewayOfflineException,
TokenExpiredException
)
async def reliable_polling_loop():
client = FranklinWHCloud("user@example.com", "secret")
while True:
try:
if not client.is_authenticated():
await client.login()
await client.select_gateway()
# Perform routine queries
snapshot = await client.get_stats()
print(f"Current SoC: {snapshot.current.soc}%")
except FranklinWHTimeoutError as e:
# Raised globally when httpx or socket drops the connection natively
print(f"Cloud API timed out: {e}. Retrying in 60s...")
except DeviceTimeoutException as e:
# Raised when the Cloud API is online, but your physical aGate stopped communicating
print(f"Your Gateway dropped offline from the mesh: {e}")
except GatewayOfflineException as e:
# Raised explicitly when attempting a WRITE command to a known offline box
print(f"WRITE blocked. Gateway disconnected: {e}")
except TokenExpiredException as e:
# Raised when the JWT natively expires
print("Session dead. Flagging rotation for next loop...")
client.clear_token()
except Exception as e:
# Generic catch-all for parsing failures
print(f"Unexpected fault: {e}")
await asyncio.sleep(60)
Key Exception Hierarchy¶
All library-specific exceptions inherit from FranklinWHError. Below are the primary failure modes:
| Exception Class | Cause | Resolution |
|---|---|---|
FranklinWHTimeoutError |
Raw API unresponsive / Connection reset | Standard retry backoff. |
DeviceTimeoutException |
Node-to-Cloud telemetry lost (offlineReason triggered). |
Investigate Edge WiFi or wait for 4G cellular failover. |
TokenExpiredException |
JWT session rotation required. | Invoke client.login() or client.get_token() to renew. |
InvalidCredentialsException |
401 Unauthorized (Code 10009). |
Verify username/password or LOGIN_TYPE flag. |
BadRequestParsingError |
JSON blob abruptly mutated (V1 to V2). | Ensure dependencies are tracking the latest PyPI distribution. |
Raw Cloud Endpoints vs Pythonic Aliases¶
The franklinwh-cloud library internally negotiates with a series of undocumented REST and MQTT endpoints. To keep abstractions clean, it provides simplified snake_case wrappers (e.g. get_device_info(), get_stats()) around these raw endpoints.
If you are inspecting or reverse-engineering the native Cloud API, you can hit the raw endpoints directly via client._get() and client._post(). This bypasses the dataclass parsing and returns the raw JSON.
# The Pythonic Wrapper (Recommended)
# Safely parses the response, resolves typologies, and returns a Stats dataclass
stats = await client.get_stats()
# The Raw Cloud API equivalent (For reverse-engineering)
# Using the internal protected method to hit the endpoint exactly as the mobile app does.
url = "https://energy.franklinwh.com/hes-gateway/terminal/getDeviceCompositeInfo"
# Must dynamically include gateway ID for most terminal operations
raw_json = await client._get(url, params={"gatewayId": client.gateway})
print(f"Raw Firmware string: {raw_json['result']['runtimeData']['fhpVersions']}")
Extracting CloudFront Edge Failovers (Log Analysis)¶
The library actively tracks CloudFront PoP (Point of Presence) edge routing nodes by automatically reading the x-amz-cf-pop headers returned by AWS. A transition event is raised when AWS dynamically shifts your active internet session to a different geographical edge location (for example: SYD62-P1 β MEL50-C1).
The edge tracker mechanism automatically emits a standard application WARN log whenever your active session shifts. You can extract these failover events directly from your historical application logs (e.g., if you pipe them via standard out or text logging files) using basic log analysis.
Example Command (grep):
# Search historical logs for edge failover warnings
grep "CloudFront edge transition" app.log
# Expected Terminal Output:
# [WARN] βοΈ CloudFront edge transition: SYD62-P1 β MEL50-C1
# [WARN] βοΈ CloudFront edge transition: MEL50-C1 β SYD62-P1
If you wish to view your active routing map natively inside your Python code via Edge Tracker API logic, you can generate a snapshot:
edge_data = client.get_metrics()['edge'] # or via client.get_edge_tracker().snapshot() if exposed
if getattr(client, "get_metrics", None):
# Depending on client abstraction layer
pass
Wave Type (Pricing Tier) Reference¶
| Code | WaveType Enum |
Description |
|---|---|---|
| 0 | OFF_PEAK |
Off-peak pricing tier |
| 1 | MID_PEAK |
Mid-peak pricing tier |
| 2 | ON_PEAK |
On-peak pricing tier |
| 4 | SUPER_OFF_PEAK |
Super off-peak pricing tier |
Work Mode Reference¶
| Code | workModeType Enum |
Constant | Description |
|---|---|---|---|
| 1 | TIME_OF_USE |
TIME_OF_USE |
TOU dispatch schedule controls the battery |
| 2 | SELF_CONSUMPTION |
SELF_CONSUMPTION |
Maximise solar self-use |
| 3 | EMERGENCY_BACKUP |
EMERGENCY_BACKUP |
Full battery backup |
Import:
from franklinwh_cloud.const import TIME_OF_USE, SELF_CONSUMPTION, EMERGENCY_BACKUPFull details in TOU_SCHEDULE_GUIDE.md
π©Ί TOU Health Check & Recovery¶
The FranklinWH Cloud API has no native endpoint to confirm whether a gateway is physically executing its programmed TOU schedule. A gateway can be in TOU mode but stuck in standby β the cloud reports the schedule as active, but the battery does nothing. This section documents the library's diagnostic and recovery tooling built to detect and resolve this condition.
[!IMPORTANT] These methods make real API calls and
reset_tou_mode()writes to the gateway. Call them sparingly and on-demand only β never in a polling loop. For multi-gateway accounts, always scope calls to the specific gateway that has the suspected fault. Do not call health checks across all gateways on every poll.
Background: touSendStatus Field¶
getGatewayTouListV2 (called internally by get_tou_health()) returns two sync
indicator fields alongside the schedule. These are the only confirmed values,
observed across 1,253 HTTP Toolkit HAR samples:
touSendStatus |
touAlertMessage |
Count | Meaning |
|---|---|---|---|
null |
null |
1,141 | β Settled β gateway has the current schedule, no sync pending |
2 |
"Package settings are taking effect" |
25 | π In-progress β schedule sent, gateway applying it now |
3 |
"Package settings synchronization of failed, please check after setting again" |
87 | β οΈ Sync alert β cloud did not receive gateway ACK |
[!NOTE]
touSendStatus=3is not always a real failure. The gateway may have applied the schedule to its local DB but the ACK response was lost (e.g. brief internet drop during sync). The mobile app surfaces this as a modal dialog and allows the user to "Apply Again". Treat it as a warning, not a hard fault β cross-reference withrun_statusbefore acting.
get_tou_health() β Read-Only Diagnostic¶
Mixin: TouMixin | Calls: getGatewayTouListV2 + get_tou_info(1) + live stats
Returns a single verdict dict describing whether the gateway is correctly executing the active TOU schedule block for the current time.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
live_stats |
dict \| None |
None |
Pass the last polled stats dict to avoid an extra API call. If None, the method calls get_stats() internally. |
Return dict:
| Key | Type | Description |
|---|---|---|
ok |
bool |
True only when health_status == "HEALTHY" |
health_status |
str |
"HEALTHY" / "DEGRADED" / "FAULT" |
active_mode |
str |
Raw mode name from the API |
is_tou_mode |
bool |
Whether the gateway is currently in TOU mode |
sync_status |
str |
"SYNC_OK" / "SYNC_PENDING" / "SYNC_ALERT" |
tou_send_status |
int \| None |
Raw touSendStatus value |
tou_alert_message |
str |
Raw touAlertMessage value (empty string if null) |
run_status |
int \| None |
Raw run_status (0=Standby, 1=Charging, 2=Discharging) |
run_status_label |
str |
Human label: "Standby" / "Charging" / "Discharging" |
active_block |
dict \| None |
The schedule block that should be executing right now |
expected_dispatch |
str \| None |
What the schedule says the battery should be doing |
fault_reasons |
list[str] |
Non-empty list of reasons when DEGRADED or FAULT |
Health verdict rules:
| Condition | Verdict |
|---|---|
| Not in TOU mode | FAULT |
| No active schedule block for current time | DEGRADED |
run_status matches expected dispatch |
HEALTHY |
run_status is Standby when dispatch expects action |
FAULT |
run_status is wrong direction |
DEGRADED |
touSendStatus=3 (SYNC_ALERT), any verdict |
Caps to DEGRADED minimum |
touSendStatus=2 (SYNC_PENDING), otherwise HEALTHY |
Caps to DEGRADED |
When to call it:
# β
CORRECT β on-demand, triggered by a user action or scheduled alert
if user_clicked_health_check_button:
health = await client.get_tou_health(live_stats=cached_stats)
# β
CORRECT β once per poll cycle, for the specific gateway showing symptoms
# (e.g. run_status=0 during an expected discharge window)
if run_status == 0 and expected_dispatch in ("GRID_EXPORT", "GRID_CHARGE"):
health = await client.get_tou_health(live_stats=current_stats)
# β WRONG β never call in a tight loop or across every gateway on every poll
for gw in all_gateways:
health = await client.get_tou_health() # adds 2-3 extra API calls per gateway
reset_tou_mode() β Recovery (Write Operation)¶
Mixin: DevicesMixin | Calls: set_mode() Γ 2 + get_gateway_tou_list() Γ N
Performs a controlled Self-Consumption β Time-of-Use mode toggle. This causes
the gateway firmware to re-read its local TOU schedule database from scratch, resolving
the "stuck in standby" condition. After the toggle, polls getGatewayTouListV2 at
15-second intervals to confirm touSendStatus returns to null.
[!CAUTION] This is a write operation that temporarily changes the operating mode. It MUST only be called after presenting the fault to the user and receiving explicit confirmation. Never call automatically. The FHAI endpoint requires
{"confirmed": true}in the POST body for this reason.
result = await client.reset_tou_mode(
min_soc_pct=10, # Default: 10% β SOC guard, rejects reset if battery too low
max_verify_attempts=4, # Default: 4 β max polls for gateway ACK
verify_interval_s=15, # Default: 15s β delay between polls (mirrors mobile app)
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
min_soc_pct |
int |
10 |
Minimum battery SOC% before allowing the reset. Prevents accidental discharge on a critically low battery. |
max_verify_attempts |
int |
4 |
Maximum number of getGatewayTouListV2 polls (4 Γ 15s = 60s total window). |
verify_interval_s |
int |
15 |
Seconds between verification polls. Matches the mobile app's observed post-apply polling cadence. |
Return dict:
| Key | Type | Description |
|---|---|---|
ok |
bool |
True if the mode toggle succeeded (regardless of sync ACK) |
sync_cleared |
bool |
True if touSendStatus returned to null within the retry window |
final_send_status |
int \| None |
Last observed touSendStatus after all retries |
final_alert_message |
str |
Last observed touAlertMessage (empty string if null) |
steps |
list[str] |
Full step-by-step log of every action taken |
error |
str \| None |
Set only on fatal failure (SOC guard rejected, mode switch failed) |
ok vs sync_cleared:
ok=True, sync_cleared=Trueβ Full success. Reset applied, gateway confirmed.ok=True, sync_cleared=Falseβ Reset applied but no ACK in the retry window. May be a false positive. Monitorrun_statusto verify physical execution.ok=Falseβ Fatal: SOC guard rejected the reset, or a mode switch API call failed. Checkerrorfor the reason.
Full Usage Example¶
import asyncio
from franklinwh_cloud import FranklinWHCloud
async def check_and_recover_tou():
client = FranklinWHCloud.from_config("franklinwh.ini")
await client.login()
await client.select_gateway()
# ββ Step 1: Get current stats once (reuse for health check) ββ
stats = await client.get_stats()
live = {
"battery_soc": stats.current.soc,
"run_status": stats.current.run_status,
"grid_power": stats.current.grid_power,
"battery_power": stats.current.battery_power,
}
# ββ Step 2: Run health check (on-demand only) ββ
health = await client.get_tou_health(live_stats=live)
print(f"TOU Health: {health['health_status']}")
print(f" Sync: {health['sync_status']} (touSendStatus={health['tou_send_status']})")
print(f" Mode: {health['active_mode']} (TOU={health['is_tou_mode']})")
print(f" Battery: {health['run_status_label']} (expected: {health['expected_dispatch']})")
if health['fault_reasons']:
print(" Faults:")
for r in health['fault_reasons']:
print(f" β’ {r}")
if health['ok']:
print("β
Gateway is executing schedule correctly.")
return
# ββ Step 3: Gate the recovery behind explicit confirmation ββ
print(f"\nβ οΈ Health verdict: {health['health_status']}")
confirm = input("Run TOU mode reset? This will toggle Self-ConsumptionβTOU. (yes/no): ")
if confirm.strip().lower() != "yes":
print("Reset cancelled.")
return
# ββ Step 4: Execute recovery ββ
print("\nπ Running TOU mode reset...")
result = await client.reset_tou_mode(
min_soc_pct=10,
max_verify_attempts=4, # up to 60 s of polling
verify_interval_s=15, # 15 s between polls (mobile app cadence)
)
print("\nReset steps:")
for step in result['steps']:
print(f" {step}")
if not result['ok']:
print(f"\nβ Reset failed: {result['error']}")
return
if result['sync_cleared']:
print(f"\nβ
Reset complete β gateway ACK confirmed (touSendStatus=null).")
else:
print(
f"\nβ οΈ Reset sent but sync unconfirmed "
f"(touSendStatus={result['final_send_status']}). "
f"This may be a false positive. Monitor run_status to verify."
)
# ββ Step 5: Recheck health after recovery ββ
await asyncio.sleep(5)
stats2 = await client.get_stats()
live2 = {"run_status": stats2.current.run_status}
health2 = await client.get_tou_health(live_stats=live2)
print(f"\nπ Post-reset health: {health2['health_status']} β battery={health2['run_status_label']}")
asyncio.run(check_and_recover_tou())
Expected output β stuck gateway recovered:
TOU Health: FAULT
Sync: SYNC_OK (touSendStatus=None)
Mode: Ausgrid EA11 TOU (TOU=True)
Battery: Standby (expected: GRID_EXPORT)
Faults:
β’ Gateway in TOU mode but run_status=Standby during a GRID_EXPORT block.
Expected: battery discharging to grid.
β οΈ Health verdict: FAULT
Run TOU mode reset? This will toggle Self-ConsumptionβTOU. (yes/no): yes
π Running TOU mode reset...
Reset steps:
SOC guard passed: battery at 72%.
Step 1: set_mode(Self-Consumption) β {'code': 200, ...}
Step 2: 3 s pause complete.
Step 3: set_mode(Time-of-Use) sent β polling for gateway ACK.
Step 4 attempt 1/4: waiting 15 sβ¦
Step 4 attempt 1: touSendStatus=2 (still pending/failed). Retryingβ¦
Step 4 attempt 2/4: waiting 15 sβ¦
Step 4 attempt 2: touSendStatus=null β gateway ACK received, sync confirmed.
TOU mode reset complete β schedule sync confirmed.
β
Reset complete β gateway ACK confirmed (touSendStatus=null).
π Post-reset health: HEALTHY β battery=Discharging
Multi-Gateway Accounts β Scoping Rules¶
[!WARNING]
get_tou_health()andreset_tou_mode()operate on the currently selected gateway (client.gateway). For multi-gateway accounts, you must select the correct gateway before calling these methods.
# β
CORRECT β select before calling
await client.select_gateway(gateway_id="10060006A0XXXXXXXXXX")
health = await client.get_tou_health()
# β WRONG β never loop health checks across all gateways in a single cycle
# This is 3β5 extra API calls per gateway per poll. On a 4-gateway account
# that runs every 60s, this is 12β20 extra calls per minute unnecessarily.
for gw_id in all_gateway_ids:
await client.select_gateway(gateway_id=gw_id)
health = await client.get_tou_health() # β do not do this
Only run the health check for a specific gateway when there is a concrete
signal that it may be stuck β e.g. run_status=0 (Standby) during a time window
where the schedule dictates charging or discharging.