Skip to content

API Cookbook

[!CAUTION] API Maintenance & Schema Drift The franklinwh-cloud library identifies itself to the FranklinWH Cloud API using a softwareversion request header (default: APP2.4.1). This value is sent with every request for the lifetime of the session.

What this header does: It identifies the client as a known app version. Testing across versions APP1.0.0 through APP99.0.0 showed identical responses on all tested endpoints β€” the header does not appear to gate specific fields or alter payload structures in the current API. Its primary observed role is authentication token negotiation and likely server-side telemetry/analytics.

Schema drift detection: The library runs a built-in canary trap (Client._check_canary_trap) that scans every response for a softwareVersion field and fires a warning + disk dump if a version newer than the certified baseline (APP2.11.0) is detected. This is the real mechanism for detecting upstream API changes. Always reference docs/OPENAPI_GENERATOR.md if payloads shift unexpectedly.

To override the header for a single diagnostic call: franklinwh-cli fetch --app-version APP2.11.0 ... To set it globally: pass emulate_app_version="APP2.11.0" to Client(...) or PasswordAuth(...).

Practical recipes for the FranklinWH Cloud API. Each recipe is copy-paste ready.

Prerequisites: See SANDBOX_SETUP.md for venv and credentials setup. Full method reference: See API_REFERENCE.md for all 70+ methods with args.


🚫 API Anti-Patterns & Polling Best Practices

Before building automated dashboards or backend integrators (like the FranklinWH Energy Manager), you must separate your polling loops into two distinct pipelines.

What NOT To Do: Do not poll static hardware data or compliance rules (get_connectivity_overview, get_device_info, getComplianceDetailById, get_smart_circuits_info) at the same frequency as power telemetry! Tying static fetches to your 5-10 second telemetry tick will aggressively throttle the aGate's internal MQTT relay, flood AWS with useless calls, and instantly cause DeviceTimeoutException API crashes.

What To Do (The Best Practice Architecture): 1. The Fast Loop (get_stats): Poll get_stats() rapidly (e.g., every 5-15 seconds) for real-time power flow, SoC levels, and grid states. This endpoint is highly optimized for frequency. 2. The Slow Loop (Static/Network Data): Poll static/config endpoints once on application startup, and then refresh them on a slow, lazy timer (e.g., every 15-60 minutes), or exclusively when a user clicks a manual "Refresh" button.

Legacy Field Aliases (Relays)

The cloud API often exposes duplicated attributes via different payload structures. Specifically for hardware relays, the legacy gridRelayStat, oilRelayStat, and solarRelayStat (from get_power_info) perfectly duplicate the array main_sw (from get_device_composite_info runtimeData). * gridRelayStat == main_sw[0] * oilRelayStat == main_sw[1] (Generator) * solarRelayStat == main_sw[2] Recommendation: Always consume the curated client.get_stats() β†’ Stats.current object, which evaluates and normalizes these aliases automatically beneath the hood without making excessive API calls.

Native Library Cache & Rate Limiting

The franklinwh-cloud library ships with built-in mechanisms to actively combat excessive polling and protect the fragile aGate MQTT boundaries. If your integrator tool (like an Admin Console) shows thousands of hits to static endpoints over just a few hours, your architecture is circumventing the internal cache boundaries!

1. The Method TTL Cache (Proactive) You can instruct the client to cache expensive, slow-changing static endpoints (like Smart Circuits or BMS data) natively for a fixed TTL. If you call get_smart_circuits_info() 10 times in a minute, only 1 actual API request will be sent to the cloud.

from franklinwh_cloud import FranklinWHCloud
from franklinwh_cloud.cache import DEFAULT_CACHE

# Initialize with library-recommended TTL mapping
client = FranklinWHCloud("YOUR_EMAIL", "YOUR_PASSWORD", cache=DEFAULT_CACHE)

# Or override specific TTLs (seconds)
custom_cache = {
    **DEFAULT_CACHE,
    "get_bms_info": 120,    # Cache battery cell voltages for 2 mins
    "get_device_info": 600, # Cache hardware serials for 10 mins
}
client = FranklinWHCloud("YOUR_EMAIL", "YOUR_PASSWORD", cache=custom_cache)
(Any method that modifies state, e.g. set_smart_switch_state, automatically invalidates its relevant cache slot.)

2. Stale Data Degradation (Reactive) If the upstream FranklinWH Cloud encounters an outage or severely limits your account, you can enable tolerate_stale_data. The client will safely serve the last-known-good telemetry rather than throwing hard TimeoutExceptions.

client = FranklinWHCloud(
    email="YOUR_EMAIL", 
    password="YOUR_PASSWORD", 
    tolerate_stale_data=True, 
    stale_cache_ttl=300 # Data is considered "stale but usable" for 5 minutes
)

Always check client.metrics.snapshot() to audit your background thread discipline.


🚦 Transport Architecture: REST GET vs MQTT Relay

Understanding the two transport paths in this library is essential before writing custom polling code or building your own API layer on top. The two mechanisms have fundamentally different cost and latency characteristics.

The Two Transport Paths

Transport Mechanism Cost CloudFront-cacheable Who uses it
REST GET β†’ cloud aggregator HTTPS GET to /hes-gateway/terminal/getDeviceCompositeInfo Cheapest β€” cloud assembles response from its DB βœ… Yes get_stats(), get_mode(), set_mode()
REST GET/POST β†’ cloud REST API Standard HTTPS to cloud REST endpoints Low-medium Partial get_gateway_tou_list(), get_tou_info(), set_mode() POST
MQTT Relay β†’ physical aGate POST to /hes-gateway/terminal/sendMqtt with cmdType in body Higher β€” cloud relays over MQTT to aGate hardware, awaits response ❌ No get_power_info() (211), get_smart_circuits_info() (311), _switch_usage() (353)

[!IMPORTANT] The MQTT relay path is a physical hardware round-trip. Every sendMqtt call crosses: Cloud β†’ AWS IoT β†’ aGate firmware β†’ back. This is not a DB lookup. Overusing MQTT relay calls on fast poll cycles is the primary cause of DeviceTimeoutException failures.

What the Schema 203/, 211/, 311/ Source Prefixes Mean

The franklinwh-cli schema command's Source column uses cmdType/sub-object notation. These prefixes describe which command produced the field β€” they are not transport cost labels.

Source prefix Transport path When it fires
203/runtimeData, 203/result REST GET β†’ getDeviceCompositeInfo Every get_stats() call β€” the main poll (cheapest)
211/result MQTT Relay β†’ get_power_info() (cmdType 211) Only when get_stats(include_electrical=True)
311/runtimeData, 311/sw_data MQTT Relay β†’ _switch_usage() (cmdType 353) Only when smart circuits are active (pro_load[] non-zero)
derived Local computation Free β€” no API call
get_tou_info REST GET β†’ TOU schedule endpoint Only when get_mode() is in TOU mode

[!NOTE] 203/runtimeData is not a sendMqtt + cmdType 203 call. The private _status() method (which does use sendMqtt with cmdType 203) is a legacy low-level method not called by get_stats(). The getDeviceCompositeInfo REST GET is the production path.

The get_stats() Call Tree

get_stats()
  β”‚
  β”œβ”€ get_device_composite_info()      ← 1x REST GET β€” all 203/ fields, cheap + CloudFront-cach'd
  β”‚    └─ result: runtimeData, currentWorkMode, solarHaveVo, deviceStatus, alarms, relays
  β”‚
  β”œβ”€ get_operating_mode_name()        ← dict lookup (OPERATING_MODES const) β€” FREE
  β”‚
  β”œβ”€ [conditional] _switch_usage()   ← MQTT relay cmdType 353 β€” only if pro_load[] non-zero
  β”‚    └─ triggers: smart circuit SW1/SW2/V2L present and active in runtimeData.pro_load
  β”‚
  β”œβ”€ [conditional] get_grid_status() ← REST GET β€” only if relay OPEN or offgridreason set
  β”‚    └─ triggers: ~0.1% of normal polls (relay open or off-grid flag)
  β”‚
  └─ [conditional] get_power_info()  ← MQTT relay cmdType 211 β€” only if include_electrical=True
       └─ triggers: caller opts in explicitly (FHAI uses every Nth poll, not every tick)

Normal poll (grid-connected, no smart circuits, no electrical): 1 REST GET. That's it.

The composite_hint Pattern β€” Avoiding Redundant Fetches

When your code calls get_stats() and then immediately calls get_mode() or set_mode(), you are paying for getDeviceCompositeInfo twice β€” once inside get_stats() and once inside the mode call. Use composite_hint to eliminate the duplicate:

# ❌ Naive pattern β€” double REST GET:
stats = await client.get_stats()       # calls getDeviceCompositeInfo internally
mode  = await client.get_mode()        # calls getDeviceCompositeInfo AGAIN

# βœ… Optimised pattern β€” single REST GET:
composite = await client.get_device_composite_info()
stats = await client.get_stats()        # NOTE: get_stats() calls composite internally;
                                        # for a shared-cycle optimisation, call composite
                                        # once and pass it to both:
mode  = await client.get_mode(composite_hint=composite)

# βœ… Simplest optimised pattern for a combined mode+stats read:
composite = await client.get_device_composite_info()
mode  = await client.get_mode(composite_hint=composite)
# stats fields available directly in composite["result"]["runtimeData"]

The composite_hint accepts the full response dict returned by get_device_composite_info() (i.e. {"code": 200, "result": {...}, "message": "success"}). When provided, get_mode() and set_mode() skip their internal getDeviceCompositeInfo call entirely.

[!TIP] composite_hint is keyword-only (after a bare *) so it cannot be passed positionally. Existing call sites using positional args are 100% backward-compatible with zero changes.

Why You Should Not Write Your Own Transport Layer

The library's transport paths have substantial hardening you would need to replicate:

  • Auto-retry with token refresh: instrumented_retry transparently catches 401/code 10009 responses, negotiates a new JWT via TokenFetcher, and replays the original request.
  • MQTT multiplexing guard: sendMqtt calls cannot be concurrent β€” the aGate MQTT broker cannot multiplex simultaneous requests. The library sequences all sendMqtt calls carefully (e.g. get_bms_info() type 2 then type 3 are sequential by design).
  • Stale data fallback: get_stats() returns the last-known-good Stats object when the cloud returns an empty result: null glitch payload β€” a real observed cloud bug (not a crash).
  • CloudFront edge tracking: Every HTTP response is inspected for X-Cache / x-amz-cf-pop headers to maintain a live EdgeTracker PoP map for diagnostic telemetry.
  • Canary trap: Every response is checked for an unrecognised softwareVersion field. If a new firmware version is detected, the payload is dumped to disk for schema diffing.

Reimplementing direct sendMqtt or getDeviceCompositeInfo calls outside this library means none of the above protections apply to those calls.


πŸ”Œ Grid Connection State

[!IMPORTANT] GridConnectionState replaces the old grid_outage: bool field (removed 2026-04-10). Any integrator reading stats.current.grid_outage must migrate to stats.current.grid_connection_state.

The GridConnectionState enum provides unambiguous, four-state grid reporting covering all real-world FranklinWH site topologies β€” grid-tied homes, off-grid sites, active outages, and user-initiated simulation tests.

The Four States

Value .value (str) Meaning When you see it
CONNECTED "Connected" Grid relay CLOSED β€” utility power available Normal daily operation
OUTAGE "Outage" Firmware detected grid loss (offGridFlag=1) Real grid failure β€” island mode
NOT_GRID_TIED "NotGridTied" Site has no utility connection β€” permanent island Off-grid solar/battery installs
SIMULATED_OFF_GRID "SimulatedOffGrid" User-initiated island test Commissioning, testing, drills

Detection Strategy (Zero-Overhead on Normal Systems)

The library derives state from data already fetched by get_stats(). No extra API calls are made on a normally-connected system:

startup:   get_entrance_info() β†’ gridFlag=False β†’ NOT_GRID_TIED cached forever (never re-checked)

per poll:
  offGridFlag == 1    β†’ OUTAGE              (short-circuit, no extra call)
  main_sw[0] == 1     β†’ CONNECTED           (no extra call β€” covers 99.9% of polls)
  main_sw[0] == 0  ─┐
  offgridreason != 0 ─┼→ get_grid_status() β†’ offgridState==1 β†’ SIMULATED_OFF_GRID
                      β””β†’                   β†’ offgridState==0 β†’ OUTAGE

[!NOTE] The dual-gate (main_sw[0]==0 OR offgridreason!=null) handles a known firmware API reporting lag (~5–10s) where offgridreason is set before main_sw updates after a simulated off-grid activation. This is expected vendor behaviour β€” the grid contactor is a mechanical relay. The library does not retry; it returns the correct state on the first poll where API data is internally consistent.

Basic Usage

import asyncio
from franklinwh_cloud import FranklinWHCloud
from franklinwh_cloud.models import GridConnectionState

async def main():
    client = FranklinWHCloud.from_config("franklinwh.ini")
    await client.login()
    await client.select_gateway()

    stats = await client.get_stats()
    state = stats.current.grid_connection_state

    # .value gives the display string directly
    print(f"Grid: {state.value}")
    # Output: "Connected" | "Outage" | "SimulatedOffGrid" | "NotGridTied"

    # Exact identity check
    if state == GridConnectionState.CONNECTED:
        print("βœ… Grid available β€” normal operation")
    elif state == GridConnectionState.OUTAGE:
        print("❌ Grid outage β€” running on battery + solar")
    elif state == GridConnectionState.SIMULATED_OFF_GRID:
        print("⚑ Simulated off-grid β€” user-initiated island test")
    elif state == GridConnectionState.NOT_GRID_TIED:
        print("🏝️  Off-grid site β€” no utility connection")

asyncio.run(main())

State-Gated Automation (Safe Pattern)

Gate grid-dependent actions behind CONNECTED. This prevents dispatching grid charges or exports during outages, simulations, or on off-grid sites:

from franklinwh_cloud.models import GridConnectionState
from franklinwh_cloud.const import EMERGENCY_BACKUP

stats = await client.get_stats()
state = stats.current.grid_connection_state

if state == GridConnectionState.CONNECTED:
    # Safe to: import from grid, export to grid, run TOU schedules
    await client.set_tou_schedule(touMode="SELF")

elif state == GridConnectionState.OUTAGE:
    # Grid is down β€” switch to Emergency Backup to maximise reserve
    await client.set_mode(EMERGENCY_BACKUP, soc=100)
    print("🚨 Grid outage detected β€” Emergency Backup activated")

elif state == GridConnectionState.SIMULATED_OFF_GRID:
    # User is running an island test β€” take no automated action
    print("⚑ Simulation active β€” skipping scheduled dispatch")

elif state == GridConnectionState.NOT_GRID_TIED:
    # Permanent off-grid site β€” grid-dependent schedules never apply
    print("🏝️  Off-grid site β€” TOU schedule skipped")

Dashboard / Status Display

# Colour-coded terminal output (ANSI)
_COLORS = {
    GridConnectionState.CONNECTED:          "\033[32mConnected\033[0m",      # green
    GridConnectionState.OUTAGE:             "\033[31mOutage\033[0m",         # red
    GridConnectionState.SIMULATED_OFF_GRID: "\033[33mSimulated\033[0m",     # yellow
    GridConnectionState.NOT_GRID_TIED:      "\033[36mNot Grid-Tied\033[0m", # cyan
}

state = stats.current.grid_connection_state
print(f"Grid: {_COLORS.get(state, state.value)}")

# JSON / MQTT telemetry payload β€” .value is always a str
payload = {
    "grid_status": state.value,                          # "Connected" etc.
    "grid_ok":     state == GridConnectionState.CONNECTED,  # bool shortcut
    "solar_kw":    stats.current.solar_production,
    "battery_soc": stats.current.battery_soc,
}

FHAI / Home Assistant Integration

The FHAI gateway service receives the flattened Current dataclass as a dict. dataclasses.asdict() serialises the enum to its .value string automatically:

import dataclasses
from franklinwh_cloud.models import GridConnectionState

stats = await client.get_stats()
d = dataclasses.asdict(stats.current)
# grid_connection_state is now the .value string in the dict

grid_status = d.get("grid_connection_state", "Connected")
# β†’ "Connected" | "Outage" | "SimulatedOffGrid" | "NotGridTied"

status_payload = {
    "grid_status": grid_status,   # forward directly to HA sensor
    # ... other fields
}

[!TIP] FHAI handoff note: grid_connection_state is always a string after asdict(). No bool checks, no if grid_outage branches. Map each value to a Home Assistant sensor state directly β€” e.g. HA state_class: measurement with options list.

Live Integration Test

A destructive live test verifying the complete state cycle is included:

# Requires: franklinwh.ini with real credentials, SOC β‰₯ reserve + 10%
pytest -m "live and destructive" tests/test_live.py::TestLiveGridConnectionState -s -v

Pre-flight checks enforced: SOC margin, current connection state, terminal yes confirmation. Guarantees: try/finally restore, poll-loop on reconnect (30s timeout).


βš™οΈ Operating Mode & Run Status β€” Field Guide

[!WARNING] FranklinWH Naming Collision. The runtimeData payload contains two similarly-named integer fields with completely different semantics. Previous agents got this wrong multiple times. Read this section carefully before writing any mode-display or VPP detection logic.

The Two Raw Fields β€” What They Actually Mean

The raw API response (getDeviceCompositeInfo, cmdType 203, result.runtimeData) includes:

Raw API key Python field Semantics Live examples
runtimeData.run_status run_status (int) RUN_STATUS key β€” what the battery hardware is physically doing. Maps directly to RUN_STATUS dict. 1=Charging, 2=Discharging
runtimeData.mode tou_mode (int) Programme/schedule ID β€” an arbitrary backend ID for the active programme. NOT a RUN_STATUS key. VPP happens to use ID 9; normal TOU programmes use large IDs like 29287. 9=VPP, 29287=Ausgrid EA11 TOU
runtimeData.name tou_mode_desc (str) Programme label β€” the human-readable name for the active programme. Often empty during VPP β€” always fall back to RUN_STATUS[9] in that case. "Ausgrid EA11 TOU", ""

[!CAUTION] The key mistake to avoid: runtimeData.mode is not a RUN_STATUS key. Doing RUN_STATUS[runtimeData.mode] will return "Unknown" for any normal TOU programme (e.g. 29287). Only runtimeData.run_status maps to RUN_STATUS. FranklinWH's naming is genuinely misleading.

RUN_STATUS Value Table (keyed by runtimeData.run_status, not runtimeData.mode)

from franklinwh_cloud.const import RUN_STATUS

# RUN_STATUS = {
#     0: "Standby",              # Inactive / idle
#     1: "Charging",
#     2: "Discharging",
#     3: "Unknown 3",            # Reserved
#     4: "Unknown 4",            # Reserved
#     5: "Off-Grid Standby",     # Island mode β€” battery idle, no grid
#     6: "Off-Grid Charging",    # Island mode β€” solar charging battery
#     7: "Off-Grid Discharging", # Island mode β€” battery powering home
#     8: "Debug Mode",           # Franklin Remote Support session active
#     9: "VPP mode",             # Virtual Power Plant β€” utility/aggregator controlled
# }

The Three Derived Fields on stats.current

Python field Derived from What it shows
run_status_desc RUN_STATUS[runtimeData.run_status] What the battery is physically doing β€” "Charging", "Discharging", "Standby"
tou_mode_desc runtimeData.name (raw) Active programme label from the cloud β€” "Ausgrid EA11 TOU", "" during VPP
effective_mode derived (priority order below) App-matching dominant mode label β€” the single best label to show users

effective_mode priority order: 1. tou_mode_desc if non-empty β†’ e.g. "Ausgrid EA11 TOU", covers named programmes 2. RUN_STATUS[9] = "VPP mode" if tou_mode == 9 and name is empty β†’ VPP fallback 3. work_mode_desc β†’ "Time-Of-Use", "Self-Consumption", "Emergency Backup"

VPP Mode (Virtual Power Plant)

VPP mode means your gateway is under utility/aggregator control. The gateway dispatches autonomously to external signals β€” your personal TOU schedule is suspended.

Detection:

stats = await client.get_stats()
cur = stats.current

# Recommended: use tou_mode (programme ID 9 = VPP)
is_vpp = cur.tou_mode == 9

# Or check the pre-derived effective_mode label
is_vpp = cur.effective_mode == "VPP mode"

What the FranklinWH mobile app shows during VPP:

● Discharging    ← runtimeData.run_status β†’ RUN_STATUS[2] β†’ hardware action
βŠ™ VPP Mode      ← runtimeData.name (or RUN_STATUS[9] fallback when name is empty)

The bullet-dot (●) line = what the battery is doing. The gear (βŠ™) line = what controls it.

stats = await client.get_stats()
cur = stats.current

# Single dominant label β€” matches the app's top-card display
print(f"Mode:     {cur.effective_mode}")
# β†’ "Ausgrid EA11 TOU" | "VPP mode" | "Time-Of-Use" | "Self-Consumption"

# Hardware action β€” what the battery is physically doing right now
print(f"Action:   {cur.run_status_desc}")
# β†’ "Charging" | "Discharging" | "Standby"

Exactly reproducing the app's two-line display:

from franklinwh_cloud.const import RUN_STATUS

cur = stats.current

# Line 1 (● bullet) β€” hardware physical action
print(f"● {cur.run_status_desc}")    # RUN_STATUS[runtimeData.run_status]

# Line 2 (βŠ™ gear) β€” controlling programme
print(f"βŠ™ {cur.effective_mode}")     # tou_mode_desc | "VPP mode" | work_mode_desc

Home Assistant / MQTT Payload

stats = await client.get_stats()
cur = stats.current

payload = {
    # Primary display sensor β€” matches the app's dominant label
    "effective_mode":   cur.effective_mode,
    # β†’ "Ausgrid EA11 TOU" | "VPP mode" | "Time-Of-Use" | "Self-Consumption"

    # Hardware action sensor β€” what the battery is physically doing
    "hardware_action":  cur.run_status_desc,
    # β†’ "Charging" | "Discharging" | "Standby"

    # Base operating mode (unchanged by VPP)
    "work_mode":        cur.work_mode_desc,    # "Time-Of-Use" | "Self-Consumption" | "Emergency Backup"
    "work_mode_int":    cur.work_mode,         # 1=TOU, 2=Self, 3=EmgBkp

    # Raw programme ID β€” useful for automation triggers
    "programme_id":     cur.tou_mode,          # runtimeData.mode β€” arbitrary int (9=VPP, 29287=Ausgrid etc.)
    "programme_label":  cur.tou_mode_desc,     # runtimeData.name β€” may be empty

    # Hardware state int β€” for history graphs
    "run_status_int":   cur.run_status,        # runtimeData.run_status β€” RUN_STATUS key

    # Convenience flag
    "vpp_active":       cur.tou_mode == 9,     # bool
}

[!TIP] When tou_mode_desc is empty (common during VPP): the library automatically falls back to RUN_STATUS[9] = "VPP mode" in effective_mode. You never need to null-check tou_mode_desc yourself β€” just read effective_mode.


πŸ›οΈ Roles & Responsibilities: Integrator vs Library

When integrating this library into an end-user application (like Home Assistant), you must maintain a strict conceptual boundary between what the library does and what your application is responsible for.

The Facade Pattern

The franklinwh-cloud library is a rigid facade. Its only job is to abstract away the undocumented, unstable, and volatile FranklinWH cloud endpoints so you never have to care if they rename an internal variable tomorrow. The library guarantees that stats.current.grid_relay will always be available, regardless of how many endpoints it had to secretly query to construct that value.

Accessory Impact & Upstream Limitations

The FranklinWH Cloud API is heavily un-optimized for systems with optional accessories. If an aGate has a Generator mapped or V2L enabled, the cloud backend physically requires the library to query secondary, non-cached endpoints (like cmdType 211) to assemble a complete telemetry snapshot.

This doubles or triples the API footprint per refresh tick. This is an upstream cloud limitation, not a library deficiency.

As the Integrator/App Developer, it is your responsibility to inform your users of the performance hit. Your application should proactively warn users: "Because you have a Generator installed your telemetry requires multiple API cycles; you may experience higher latency or aggressive rate limiting if poll rates are set too aggressively." Do not attempt to force the library to mask upstream latency.

Infinite Session Persistence (Transparent Auto-Renewal)

Users of the official FranklinWH Mobile App often experience "Idle Timeouts" or "Session Expired β€” Please log back in" prompts. The official app pushes the burden of session management onto the user.

By design, the franklinwh-cloud library entirely subverts this. The library embeds an instrumented_retry loop at the core HTTP boundaries. If the cloud servers invalidate the JWT token (HTTP 401 or Code 10009), the library will silently catch the rejection, automatically negotiate a new token via the TokenFetcher, and replay the original API request identically.

Downstream clients (like Home Assistant) therefore benefit from infinite session persistence and will never receive an expired session exception unless the underlying master credentials have been permanently revoked.

If your integration requires auditing these transparent rotations natively (e.g., to draw a "Session Uptime" metric on a dashboard), you can easily poll the built-in tracking metric:

# Returns elapsed seconds since the last silent JWT refresh, 
# or None if the original token is still valid.
s_ago = client.get_metrics().get("last_token_refresh_s_ago")

if s_ago is not None:
    print(f"Library transparently negotiated a fresh token {s_ago} seconds ago.")

Connection Preamble

All recipes start with establishing an authenticated session and binding a physical aGate serial number.

Modern Transparent Auth (The Future)

The modern Client boundary provides absolute control over the emulation footprint (e.g., passing a specific emulate_app_version API header) and decouples the authentication lifecycle from the command executor. This is the recommended approach for all new integrations.

import asyncio
from franklinwh_cloud.auth import PasswordAuth
from franklinwh_cloud.client import Client

async def main():
    # 1. Fetch token and dictate the exact mobile emulation string
    auth = PasswordAuth("user@example.com", "secret", emulate_app_version="APP2.11.0")
    await auth.get_token()

    # 2. Bind the active session to a specific physical aGate
    #    (Required for multi-aGate environments!)
    gateway_serial = "10060006AXXXXXXXXX" 
    client = Client(auth, gateway=gateway_serial, emulate_app_version="APP2.11.0")

    # ... your recipe code here ...
    stats = await client.get_stats()
    print(f"Battery SoC: {stats.current.battery_pct}%")

asyncio.run(main())

Legacy Wrapper (Single aGate Happy Path)

If you have an older script or only manage a single aGate on your account, the legacy FranklinWHCloud orchestrator will automatically guess your credentials and auto-discover the serial number for you.

import asyncio
from franklinwh_cloud import FranklinWHCloud

async def main():
    # Will automatically fetch CLI or .ini credentials if omitted
    client = FranklinWHCloud(email="user@example.com", password="secret")
    await client.login()
    await client.select_gateway()   # Natively fetches and binds the first gateway it finds

    # ... your recipe code here ...
    stats = await client.get_stats()
    print(f"Battery SoC: {stats.current.battery_pct}%")

asyncio.run(main())

Multi-aGate Discovery (Account-Level APIs)

If you manage multiple gateways on a single account and don't know their serial numbers natively, you MUST iteratively discover them before pushing commands. By substituting a temporary proxy client, you can securely execute the get_home_gateway_list() account API before touching hardware.

import asyncio
from franklinwh_cloud.auth import PasswordAuth
from franklinwh_cloud.client import Client

async def main():
    auth = PasswordAuth("user@example.com", "secret")

    # 1. Instantiate a proxy client to unlock Account-level APIs
    proxy = Client(auth, "placeholder")
    gateways_raw = await proxy.get_home_gateway_list()

    # 2. Iterate and bind explicitly
    for gw in gateways_raw.get("result", []):
        serial = gw.get("id")
        print(f"\\n--- Binding to aGate {serial} ---")

        # 3. Create a dedicated client purely for this physical aGate 
        agate_client = Client(auth, gateway=serial)

        # Now hardware calls are safely routed to this target
        stats = await agate_client.get_stats()
        print(f"[{serial}] Battery SoC: {stats.current.battery_pct}%")

asyncio.run(main())

Gateway Groups β€” get_home_gateway_list() Group Fields

get_home_gateway_list() is the only API that returns group membership. The get_site_and_device_info() endpoint does NOT include group data β€” it only flat-lists gateways under a site.

Raw response fields (per gateway entry):

{
  "id":        "10060006A0XXXXXXXXXX",
  "name":      "FHP",
  "groupId":   null,
  "groupName": null,
  "groupFlag": 0
}
Field Type Meaning
groupId str \| null UUID/int of the group. null = ungrouped
groupName str \| null Human label ("Main House"). null = ungrouped
groupFlag int 1 = this gateway belongs to a group; 0 = ungrouped

[!IMPORTANT] groupFlag=0 is the authoritative "ungrouped" sentinel β€” do not infer grouping from groupId != null alone. Always check groupFlag == 1 first. Single-gateway accounts will have groupId: null, groupName: null, groupFlag: 0.

Pattern β€” building a group-aware account topology:

gateways_raw = await proxy.get_home_gateway_list()
gateways = gateways_raw.get("result", [])

# Only show group tier when at least one gateway is grouped
has_groups = any(gw.get("groupFlag") == 1 for gw in gateways)

# Build group buckets
group_buckets = {}   # groupId (or None) β†’ [gateway_dict, ...]
group_names   = {}   # groupId β†’ groupName

for gw in gateways:
    grp = gw.get("groupId") if gw.get("groupFlag") == 1 else None
    group_buckets.setdefault(grp, []).append(gw)
    if grp and grp not in group_names:
        group_names[grp] = gw.get("groupName") or f"Group {grp}"

if has_groups:
    for grp_id, members in group_buckets.items():
        label = group_names.get(grp_id, "(ungrouped)")
        print(f"Group: {label} (GroupId: {grp_id})")
        for gw in members:
            print(f"  └── {gw['name']} ({gw['id']})")
else:
    # Single-gateway or all ungrouped β€” no group header needed
    for gw in gateways:
        print(f"└── {gw['name']} ({gw['id']})")

Result for a two-group account:

Group: "Main House" (GroupId: 501)
  └── FHP1 (10060006AXXXXXXXXX)
  └── FHP2 (10060006AXXXXXXXXX)
Group: (ungrouped)
  └── FHP3 (10060006AXXXXXXXXX)

[!NOTE] Groups are a display/organisation concept only β€” they have no effect on API routing or command scoping. Each gateway always requires its own Client(auth, gateway=serial) instance regardless of group membership.

Custom Client Identity (HTTP Headers)

By default, the library sends a generic franklinwh-cloud-client User-Agent. If you are building an integration (e.g., a Home Assistant add-on or custom dashboard), you can declare your identity to FranklinWH's servers:

    custom_headers = {
        "User-Agent": "HomeAssistant-Addon/1.0.0",
        "X-Client-Version": "1.0.0"
    }

    client = FranklinWHCloud(
        email="user@example.com", 
        password="secret", 
        client_headers=custom_headers
    )

Or load from config file:

from franklinwh_cloud import FranklinWHCloud

client = FranklinWHCloud.from_config("franklinwh.ini")

Quick Reference

One-liner recipes. All assume client is connected (see preamble above).

Power Flow & Status

stats = await client.get_stats()

# Instantaneous power (kW)
solar_kw     = stats.current.solar_production    # p_sun
battery_kw   = stats.current.battery_use         # p_fhp (negative = charging)
grid_kw      = stats.current.grid_use            # p_uti
home_kw      = stats.current.home_load           # p_load
soc          = stats.current.battery_soc         # Battery %

# Operating state
mode_name    = stats.current.work_mode_desc      # "Self Consumption"
run_status   = stats.current.run_status_desc      # "Normal operation"

# Grid connection state (four-state enum β€” see GridConnectionState section below)
from franklinwh_cloud.models import GridConnectionState
grid_state   = stats.current.grid_connection_state  # GridConnectionState.CONNECTED
grid_label   = grid_state.value                      # "Connected" / "Outage" / ...
grid_ok      = grid_state == GridConnectionState.CONNECTED

# Daily totals (kWh)
solar_kwh    = stats.totals.solar                # kwh_sun
grid_in_kwh  = stats.totals.grid_import          # kwh_uti_in
grid_out_kwh = stats.totals.grid_export          # kwh_uti_out
bat_chg_kwh  = stats.totals.battery_charge       # kwh_fhp_chg
bat_dis_kwh  = stats.totals.battery_discharge    # kwh_fhp_di
home_kwh     = stats.totals.home_use             # kwh_load

Operating Mode & System Limiters (Global SOC Configs)

The getGatewayTouListV2 endpoint serves not just TOU schedules, but acts as a global Operating Mode Configuration blob. Recent updates have exposed several programmatic SOC (State of Charge) limiters:

  • soc: The global backup reserve buffer (Emergency Reserve).
  • maxSoc: The maximum allowed charging limit (e.g., protecting degraded cells).
  • complianceSoc: The regulatory/utility-mandated minimum reserve.
  • dischargeDepthSoc: The lowest permissible physical discharge limit (often 5% or 10%).
from franklinwh_cloud.const import (
    TIME_OF_USE, SELF_CONSUMPTION, EMERGENCY_BACKUP,  # 1, 2, 3
)

# 1. View Global Configurations (Including SOC parameters)
config = await client.get_tou_dispatch_detail()

print(f"Current Reserve SOC: {config.get('soc')}%")
print(f"Maximum Charge Limit (maxSoc): {config.get('maxSoc', 'Unrestricted')}%")
print(f"Compliance Reserve (complianceSoc): {config.get('complianceSoc', 'None')}%")
print(f"Discharge Depth Threshold: {config.get('dischargeDepthSoc', 'None')}%")

# 2. Get current mode specifically
mode = await client.get_mode()
print(f"Mode: {mode['modeName']}, Run: {mode['run_desc']}")

# 3. Get reserve SoC for all modes
soc_all = await client.get_all_mode_soc()
# Returns: [{'workMode': 1, 'name': 'Time of Use', 'soc': 7.0, ...}, ...]

# 4. Switch to Self-Consumption, keep current SoC
await client.set_mode(SELF_CONSUMPTION, None, None, None, None)
#                     workMode=2        soc  forever nextMode duration

# 5. Switch to Emergency Backup β€” indefinite
await client.set_mode(EMERGENCY_BACKUP, None, 1, SELF_CONSUMPTION, None)
#                     workMode=3        soc  forever=1 nextMode=2   duration

# 6. Switch to Emergency Backup β€” 2 hours, then revert to Self-Consumption
await client.set_mode(EMERGENCY_BACKUP, None, 2, SELF_CONSUMPTION, 120)
#                     workMode=3        soc  timed=2  nextMode=2    mins

# 7. Update backup reserve SoC to 20% for Self-Consumption mode
await client.update_soc(requestedSOC=20, workMode=SELF_CONSUMPTION)
#                                        workMode=2

TOU Scheduling

[!CAUTION] Every set_tou_schedule / set_tou_schedule_multi call is DESTRUCTIVE. saveTouDispatch validates, saves, AND switches the gateway to TOU mode in one atomic call. There is no "save without activating" path. Always back up the current schedule with get_tou_dispatch_detail() before dispatching so you can restore it on completion.

from franklinwh_cloud.const import (
    dispatchCodeType,  # SELF=6, GRID_CHARGE=8, GRID_EXPORT=7, STANDBY=2 ...
    WaveType,          # OFF_PEAK=0, MID_PEAK=1, ON_PEAK=2, SUPER_OFF_PEAK=4
)

# ── View current schedule ──────────────────────────────────────────────────
schedule = await client.get_tou_dispatch_detail()
# Returns raw API envelope: {"code": 200, "result": {"template": {...}, "strategyList": [...]}}
strategy_list = schedule.get("result", {}).get("strategyList", [])

# ── Set full-day self-consumption ──────────────────────────────────────────
await client.set_tou_schedule(touMode="SELF")

Example 1 β€” Single Season: Grid Charge for 1 Hour

Use this pattern when you want to force a charge window for the current active season. The library resolves today's date to the correct season automatically and leaves all other seasons untouched. The rest of the 24-hour day is filled with default_mode automatically.

# ── Set 1-hour grid charge window for today's active season ───────────────
await client.set_tou_schedule(
    touMode="CUSTOM",
    touSchedule=[{
        "name": "Grid Charge",
        "startHourTime": "01:00",
        "endHourTime":   "02:00",
        "dispatchId": dispatchCodeType.GRID_CHARGE.value,   # 8 β€” charge from solar/grid
        "waveType":   WaveType.OFF_PEAK.value,              # 0 β€” off-peak rate tier
        "gridChargeMax": 5000,                              # Watts β€” cap at 5 kW
    }],
    default_mode="SELF",   # Rest of day = self-consumption (dispatchId=6)
    day_type=3,            # 3 = everyDay (weekday + weekend)
    # month=4,             # Optional: target April's season explicitly
)

# ── Caller is responsible for backup/restore if needed ────────────────────
# Backup:  original = (await client.get_tou_dispatch_detail())["result"]["strategyList"]
# Restore: await client.set_tou_schedule_multi(original)

Dispatch codes confirmed from mobile app HAR captures:

dispatchId Code Meaning Use when
1 F aPower to home Battery powers home; solar exports
2 B aPower on standby Battery idles; solar/grid handles home
3 E aPower charges from solar Solar charges battery; grid covers home
6 D Self-consumption Normal: solar β†’ battery β†’ grid priority
7 H aPower to home/grid Battery discharges to home + exports excess
8 G aPower charges from solar/grid Force charge from both solar and grid

[!WARNING] FranklinWH has changed dispatchId numbering in past API updates. Always verify against a live franklinwh-cli tou --dispatch output before hardcoding IDs. The Code letter (B/D/E/F/G/H) is a secondary cross-reference from getCustomEnergyDispatchList.

Example 2 β€” Multi-Season: Weekday/Weekend Split

Use set_tou_schedule_multi() when you need to write a full multi-season TOU schedule with different weekday and weekend profiles per season.

[!NOTE] This is also the correct restore path after a dispatch. Save the full strategyList before dispatching and call set_tou_schedule_multi to restore it exactly β€” preserving season names, month ranges, and day-type structures.

# ── Build a two-season weekday/weekend schedule ───────────────────────────
summer_weekday_blocks = [
    {"name": "Off-peak",  "startHourTime": "00:00", "endHourTime": "07:00",
     "dispatchId": 6,  "waveType": 0},   # Self-consumption overnight
    {"name": "Charge",    "startHourTime": "07:00", "endHourTime": "10:00",
     "dispatchId": 8,  "waveType": 0},   # Charge from cheap morning solar
    {"name": "On-peak",   "startHourTime": "17:00", "endHourTime": "21:00",
     "dispatchId": 7,  "waveType": 2},   # Discharge to grid during peak
    {"name": "Off-peak",  "startHourTime": "21:00", "endHourTime": "24:00",
     "dispatchId": 6,  "waveType": 0},   # Self-consumption late night
]

summer_weekend_blocks = [
    {"name": "Off-peak",  "startHourTime": "00:00", "endHourTime": "10:00",
     "dispatchId": 6,  "waveType": 0},   # Sleep in β€” self-consumption
    {"name": "Solar",     "startHourTime": "10:00", "endHourTime": "17:00",
     "dispatchId": 3,  "waveType": 0},   # Charge from daytime solar
    {"name": "On-peak",   "startHourTime": "17:00", "endHourTime": "21:00",
     "dispatchId": 7,  "waveType": 2},   # Discharge to grid during peak
    {"name": "Off-peak",  "startHourTime": "21:00", "endHourTime": "24:00",
     "dispatchId": 6,  "waveType": 0},
]

winter_blocks = [
    {"name": "Off-peak",  "startHourTime": "00:00", "endHourTime": "06:00",
     "dispatchId": 8,  "waveType": 0},   # Cheap overnight grid charge
    {"name": "Self",      "startHourTime": "06:00", "endHourTime": "16:00",
     "dispatchId": 6,  "waveType": 0},   # Self-consumption during day
    {"name": "Peak",      "startHourTime": "16:00", "endHourTime": "21:00",
     "dispatchId": 7,  "waveType": 2},   # Peak export
    {"name": "Off-peak",  "startHourTime": "21:00", "endHourTime": "24:00",
     "dispatchId": 6,  "waveType": 0},
]

strategy_list = [
    {
        "id": None,
        "seasonName": "Summer",
        "month": "10,11,12,1,2,3",    # Oct–Mar
        "templateId": None,
        "dayTypeVoList": [
            {
                "dayName": "weekDay", "dayType": 1,   # 1 = Mon–Fri
                "detailVoList": summer_weekday_blocks,
                "eleticRateValley": 0.08, "eleticSellValley": 0.05,
                "eleticRatePeak": None, "eleticRateSharp": None,
                "eleticRateShoulder": None, "eleticRateSuperOffPeak": None,
                "eleticSellPeak": None, "eleticSellSharp": None,
                "eleticSellShoulder": None, "eleticSellSuperOffPeak": None,
                "eleticRateGridFee": None,
            },
            {
                "dayName": "weekendDay", "dayType": 2,  # 2 = Sat–Sun
                "detailVoList": summer_weekend_blocks,
                "eleticRateValley": 0.08, "eleticSellValley": 0.05,
                "eleticRatePeak": None, "eleticRateSharp": None,
                "eleticRateShoulder": None, "eleticRateSuperOffPeak": None,
                "eleticSellPeak": None, "eleticSellSharp": None,
                "eleticSellShoulder": None, "eleticSellSuperOffPeak": None,
                "eleticRateGridFee": None,
            },
        ],
    },
    {
        "id": None,
        "seasonName": "Winter",
        "month": "4,5,6,7,8,9",        # Apr–Sep
        "templateId": None,
        "dayTypeVoList": [
            {
                "dayName": "everyDay", "dayType": 3,  # 3 = all days
                "detailVoList": winter_blocks,
                "eleticRateValley": 0.06, "eleticSellValley": 0.03,
                "eleticRatePeak": None, "eleticRateSharp": None,
                "eleticRateShoulder": None, "eleticRateSuperOffPeak": None,
                "eleticSellPeak": None, "eleticSellSharp": None,
                "eleticSellShoulder": None, "eleticSellSuperOffPeak": None,
                "eleticRateGridFee": None,
            },
        ],
    },
]

await client.set_tou_schedule_multi(strategy_list)

[!TIP] Reading back what you wrote: immediately after set_tou_schedule_multi, call franklinwh-cli tou or get_tou_dispatch_detail() to verify the gateway received the schedule. The gateway cloud sync takes ~30–60 seconds.

Dispatch code reference β€” see Dispatch Code Reference below.

Power Control (PCS)

from franklinwh_cloud.models import GridStatus, GridConnectionState

# Get current grid import/export limits
pcs = await client.get_power_control_settings()

# Set grid export max to 5 kW, import unlimited
await client.set_power_control_settings(
    globalGridDischargeMax=5.0,   # export limit kW (-1=unlimited, 0=disabled)
    globalGridChargeMax=-1,       # import limit kW (-1=unlimited, 0=disabled)
)

# Go off-grid (simulate outage β€” opens grid contactor)
# NOTE: this changes grid_connection_state β†’ SIMULATED_OFF_GRID
await client.set_grid_status(GridStatus.OFF, soc=5)
#                            GridStatus.OFF=2  minimum SoC before auto-restore

# Restore grid connection
await client.set_grid_status(GridStatus.NORMAL)
#                            GridStatus.NORMAL=0

Devices & BMS

# Get aGate info (firmware, serial)
agate = await client.get_agate_info()

# Get aPower battery info (capacity, serial)
apower = await client.get_apower_info()

# Get BMS cell data for a specific battery
bms = await client.get_bms_info("APOWER_SERIAL_NUMBER")

# Get smart circuit states
circuits = await client.get_smart_circuits_info()

# Toggle smart switch 1 ON, switch 2 OFF, switch 3 unchanged
await client.set_smart_switch_state((True, False, None))

# Get relay states + grid voltage/current/frequency
power_info = await client.get_power_info()

# Device discovery β€” structured snapshot of entire system
snapshot = await client.discover(tier=2)  # tier 1=basic, 2=verbose, 3=pedantic
print(f"aGate: {snapshot.agate.model}, aPowers: {snapshot.batteries.count}")

LED Strip

# Get current LED settings
led = await client.led_light_settings(mode="1", dataArea={})

# Turn LED on with colour and brightness (aPower 2/S)
await client.led_light_settings(mode="2", dataArea={
    "lightStat": 2,              # 1=Off, 2=On
    "rgb": "FF6600",             # Hex colour
    "bright": 80,                # Brightness 0-100
    "timeEn": 1,                 # 0=No schedule, 1=Schedule enabled
    "lightOpenTime": "06:00",
    "lightCloseTime": "22:00",
})

Smart Assistant (AI)

# Get example queries
examples = await client.smart_assistant(requestType="1")  # 1=list examples

# Ask a question
answer = await client.smart_assistant(requestType="2", query="What is my battery level?")
#                                     requestType="2"  # 2=ask question
print(answer)

⚠️ Some AI commands may only execute on the mobile app.

Monitoring Network Connectivity

Instead of relying solely on tracking network_connection via get_stats(), you can pull a definitive snapshot of the active connections and their IPs using get_connectivity_overview().

[!TIP] Best Practice for API Clients / UI Dashboards: To minimize polling overhead on the hardware, call the default (essential) view periodically (e.g. every 5 minutes / on startup). Only pass deep_scan=True if you explicitly need to re-verify the SPAN integration or ping the local Modbus TCP 502 port.

# 1. Essential View (Fast, lightweight polling for UIs)
# Fetches active/backup links, AWS cloud connection, and router status.
net = await client.get_connectivity_overview()

primary = net["primary"]
print(f"Cloud Connected: {net['cloud_connected']}")
print(f"Primary Link:    {primary['name']} (ID: {primary['id']})")
print(f"Gateway & IP:    IP: {primary['ip']}, Gateway: {primary['gateway']}")

for backup in net["backups"]:
    print(f"Backup Link:     {backup['name']} (ID: {backup['id']})")

# 2. Deep Diagnostic View (Slower, use only when necessary)
# Pings Modbus 502 on the local IP and checks external SPAN flags.
deep_net = await client.get_connectivity_overview(deep_scan=True)

if deep_net["modbus_tcp_502_open"]:
    print("Modbus polling is available locally!")

Historical Energy Data

from datetime import date

# Today's energy breakdown
day = await client.get_power_details(type=1, timeperiod="2026-03-18")
#                                   type=1  # DAY β€” hourly breakdown

# This week
week = await client.get_power_details(type=2, timeperiod="2026-03-18")
#                                     type=2  # WEEK β€” daily breakdown

# This month
month = await client.get_power_details(type=3, timeperiod="2026-03-01")
#                                      type=3  # MONTH β€” daily breakdown

# This year
year = await client.get_power_details(type=4, timeperiod="2026-01-01")
#                                     type=4  # YEAR β€” monthly breakdown

# Lifetime totals
lifetime = await client.get_power_details(type=5, timeperiod=str(date.today()))
#                                         type=5  # LIFETIME β€” all-time

Weather & Storm Hedge

weather = await client.get_weather()
storms = await client.get_storm_settings()

# Enable Storm Hedge, 60 min advance backup
await client.set_storm_settings(
    stormEn=1,                  # 0=Disabled, 1=Enabled
    setAdvanceBackupTime=60,    # Minutes before storm to switch to backup
)

Account & Notifications

# List all gateways on account
gateways = await client.get_home_gateway_list()

# Get unread notification count
unread = await client.get_unread_count()

# Get recent notifications
notes = await client.get_notifications(pageNum=1, pageSize=20)

# Get warranty info
warranty = await client.get_warranty_info()

# Site info (user ID, roles, distributor)
site = await client.siteinfo()

Diagnostics & Metrics

The franklinwh-cloud client tracks detailed telemetry about every API call, retry, and HTTP connection it makes. You can pull this snapshot at any time to build API health dashboards.

# Get a realtime snapshot of API client health
metrics = client.get_metrics()

# The snapshot contains detailed routing and latency info:
print(f"Total API Calls: {metrics['uptime']['total_requests']}")
print(f"CloudFront Edge: {metrics['edge']['last_pop']}")
print(f"Average Latency: {metrics['timing']['avg_ms']:.0f} ms")

# Endpoint specific hit-counts
for ep, hits in metrics['endpoints'].items():
    print(f"  {ep}: {hits} calls")

# Error tracking and token refreshes
print(f"Parse Errors: {metrics['errors']['parse']}")
print(f"Auth Refreshes: {metrics['uptime']['token_refreshes']}")

Full Example: System Dashboard with SoC Estimation

A complete script that displays power flow, operating state, and estimates time to full charge or reserve SoC.

import asyncio
from franklinwh_cloud import FranklinWHCloud

RESERVE_SOC = 20    # Your backup reserve %

async def main():
    client = FranklinWHCloud.from_config("franklinwh.ini")
    await client.login()
    await client.select_gateway()

    # Get dynamic battery capacity directly from the API
    device_info = await client.get_device_info()
    battery_kwh = device_info.get("result", {}).get("totalCap", 13.6)

    stats = await client.get_stats()
    c = stats.current
    t = stats.totals

    # ── Power Flow ──
    print("⚑ Power Flow (kW)")
    print(f"  Solar:   {c.solar_production:6.2f} kW")
    print(f"  Battery: {c.battery_power:6.2f} kW  {'⬇ charging' if c.battery_power < 0 else '⬆ discharging'}")
    print(f"  Grid:    {c.grid_power:6.2f} kW")
    print(f"  Home:    {c.home_consumption:6.2f} kW")
    print()

    # ── Status ──
    print(f"πŸ”‹ Battery: {c.battery_soc:.0f}%")
    print(f"πŸ“‹ Mode:    {c.work_mode_desc}")
    print(f"πŸƒ Status:  {c.run_status_desc}")
    print(f"πŸ”Œ Grid:    {c.grid_connection_state.value}")
    print()

    # ── SoC Time Estimation ──
    bat_kw = abs(c.battery_power)
    if bat_kw > 0.05:  # ignore noise below 50W
        current_kwh = (c.soc / 100) * battery_kwh
        if c.battery_power < 0:  # Charging
            remaining_kwh = battery_kwh - current_kwh
            hours = remaining_kwh / bat_kw
            h, m = int(hours), int((hours % 1) * 60)
            print(f"⏱️  Estimated ~{h}h {m}m to 100%  (at {bat_kw:.1f} kW)")
        else:  # Discharging
            usable_kwh = current_kwh - (RESERVE_SOC / 100) * battery_kwh
            if usable_kwh > 0:
                hours = usable_kwh / bat_kw
                h, m = int(hours), int((hours % 1) * 60)
                print(f"⏱️  Estimated ~{h}h {m}m to reserve ({RESERVE_SOC}%)  (at {bat_kw:.1f} kW)")
            else:
                print(f"⚠️  Battery at or below reserve ({RESERVE_SOC}%)")
    else:
        print("⏸️  Battery idle")

    print()

    # ── Daily Totals ──
    print("πŸ“Š Today (kWh)")
    print(f"  Solar:      {t.solar:6.1f}")
    print(f"  Grid in:    {t.grid_import:6.1f}")
    print(f"  Grid out:   {t.grid_export:6.1f}")
    print(f"  Bat charge: {t.battery_charge:6.1f}")
    print(f"  Bat disc:   {t.battery_discharge:6.1f}")
    print(f"  Home:       {t.home:6.1f}")

asyncio.run(main())

Expected output:

⚑ Power Flow (kW)
  Solar:     4.20 kW
  Battery:  -2.10 kW  ⬇ charging
  Grid:      0.00 kW
  Home:      2.10 kW

πŸ”‹ Battery: 72%
πŸ“‹ Mode:    Self Consumption
πŸƒ Status:  Normal operation
πŸ”Œ Grid:    Connected

⏱️  Estimated ~1h 49m to 100%  (at 2.1 kW)

πŸ“Š Today (kWh)
  Solar:       18.3
  Grid in:      2.1
  Grid out:     0.0
  Bat charge:   8.4
  Bat disc:     5.2
  Home:        12.0


Full Example: Force Charge via Custom TOU

A production-ready script that demonstrates the complete lifecycle: save state β†’ configure PCS β†’ set schedule β†’ monitor β†’ restore.

Phase 1: PCS Preamble β€” Check Limits & Battery Capacity

Before dispatching, ensure the PCS (Power Control System) allows grid charging/discharging at the desired power levels, and check battery capacity to calculate target SoC.

import asyncio
from franklinwh_cloud import FranklinWHCloud
from franklinwh_cloud.const import (
    TIME_OF_USE, SELF_CONSUMPTION, EMERGENCY_BACKUP,
    dispatchCodeType, WaveType,
)
from franklinwh_cloud.models import GridStatus

# ── Configuration ──
CHARGE_START = "11:30"
CHARGE_END   = "15:00"
TARGET_SOC   = 95.0       # Stop monitoring when SoC reaches this %
POLL_INTERVAL = 60        # Seconds between monitoring polls
DISPATCH      = dispatchCodeType.GRID_CHARGE   # 8 = charge from solar+grid
WAVE_TYPE     = WaveType.OFF_PEAK              # 0 = off-peak pricing tier


async def main():
    client = FranklinWHCloud.from_config("franklinwh.ini")
    await client.login()
    await client.select_gateway()

    # ── 1a. Save original state (for restore later) ──
    original_mode = await client.get_mode()
    original_mode_id = original_mode.get("workMode", SELF_CONSUMPTION)
    original_mode_name = original_mode.get("modeName", "?")
    original_schedule = await client.get_tou_dispatch_detail()
    print(f"πŸ“‹ Saved state β€” Mode: {original_mode_name} (workMode={original_mode_id})")

    # ── 1b. Check battery capacity & inverter limits via API ──
    device_info = await client.get_device_info()
    result_data = device_info.get("result", {})

    # We use hardcoded fallback values (e.g. 13.6 kWh, 5.0 kW) purely as a safety net
    # to prevent mathematical division-by-zero crashes in the rare event of an API failure.
    # The Cloud API should realistically never return these as null.
    battery_count = len(result_data.get("apowerList", [])) or 1
    total_capacity_kwh = result_data.get("totalCap", 13.6)
    nameplate_power_kw = result_data.get("totalPower", 5.0 * battery_count)

    print(f"πŸ”‹ Batteries: {battery_count} = {total_capacity_kwh:.1f} kWh")
    print(f"⚑ Nameplate Inverter Max Power: {nameplate_power_kw:.1f} kW continuous")

    stats = await client.get_stats()
    current_soc = stats.current.soc
    print(f"πŸ”‹ Current SoC: {current_soc:.0f}%  β†’  Target: {TARGET_SOC:.0f}%")

    if current_soc >= TARGET_SOC:
        print("βœ… Already at target SoC β€” nothing to do.")
        return

    # ── 1c. Check and set PCS limits ──
    pcs = await client.get_power_control_settings()
    charge_limit = pcs.get("result", {}).get("globalGridChargeMax", -1)
    discharge_limit = pcs.get("result", {}).get("globalGridDischargeMax", -1)
    print(f"⚑ PCS limits β€” Grid charge: {charge_limit} kW, Grid discharge: {discharge_limit} kW")
    print(f"   (-1 = unlimited, 0 = disabled)")

    if charge_limit == 0:
        print("⚠️  Grid charging is DISABLED (0 kW). Enabling unlimited...")
        await client.set_power_control_settings(
            globalGridChargeMax=-1,        # -1 = unlimited
            globalGridDischargeMax=discharge_limit,  # keep existing
        )
        print("βœ“ Grid charging enabled")

    # ── 1d. Ensure we are in TOU mode ──
    if original_mode_id != TIME_OF_USE:
        print(f"πŸ”„ Switching to TOU mode (was {original_mode_name})...")
        await client.set_mode(TIME_OF_USE, None, None, None, None)
        await asyncio.sleep(3)
        print("βœ“ Now in TOU mode")

Phase 2: Submit Schedule with Error Handling

Handle common errors: invalid time format, bad dispatch codes, and API failures.

    # ── 2. Set charge schedule with error handling ──
    print(f"\n⏱️  Setting grid charge window {CHARGE_START}–{CHARGE_END}...")

    try:
        result = await client.set_tou_schedule(
            touMode="CUSTOM",
            touSchedule=[{
                "startTime": CHARGE_START,
                "endTime": CHARGE_END,
                "dispatchId": DISPATCH.value,       # dispatchCodeType.GRID_CHARGE = 8
                "waveType": WAVE_TYPE.value,      # WaveType.OFF_PEAK = 0
            }],
            default_mode="SELF",   # Outside window = self-consumption (dispatchId=6)
        )
    except ValueError as e:
        # set_tou_schedule validates times, dispatch codes, and JSON structure
        # Common errors:
        #   - Invalid time: "25:00" or "11:70" or missing startTime
        #   - Bad dispatch: dispatchId=99 (not in valid set 1,2,3,6,7,8)
        #   - Malformed JSON: missing required fields
        print(f"❌ Validation error: {e}")
        print("   Check: times must be HH:MM (00:00–24:00), 30-min boundaries")
        print("   Check: dispatchId must be one of: 1,2,3,6,7,8")
        return
    except Exception as e:
        # API-level errors (network, auth, server-side rejection)
        print(f"❌ API error: {type(e).__name__}: {e}")
        return

    # Check API response for success
    status = result.get("status")
    if status != 0:
        msg = result.get("msg", "Unknown error")
        print(f"❌ Server rejected schedule: status={status}, msg={msg}")
        return

    tou_id = result.get("result", {}).get("id", "?")
    print(f"βœ“ Schedule submitted β€” touId={tou_id}")

    # ── 2b. Verify schedule applied ──
    await asyncio.sleep(5)   # Give aGate time to apply

    detail = await client.get_tou_dispatch_detail()
    blocks = detail.get("result", {}).get("detailVoList", [])
    print(f"\nπŸ“… Active schedule ({len(blocks)} blocks):")
    for b in blocks:
        name = b.get("dispatchName", "?")
        start = b.get("startTime", "?")
        end = b.get("endTime", "?")
        wave = b.get("waveType", "?")
        print(f"  {start}–{end}  {name}  (waveType={wave})")

    if not blocks:
        print("⚠️  No dispatch blocks found β€” schedule may not have applied!")

Phase 3: Monitor Power Flow & SoC

Poll the system to confirm the dispatch is executing correctly β€” checking that the operating mode is still TOU, power is flowing in the expected direction, and the SoC target has been reached.

    # ── 3. Monitor loop ──
    print(f"\nπŸ” Monitoring every {POLL_INTERVAL}s until SoC β‰₯ {TARGET_SOC}%...")
    print(f"   Press Ctrl+C to stop monitoring early.\n")

    try:
        while True:
            stats = await client.get_stats()
            c = stats.current

            soc = c.soc
            bat_kw = c.battery_power     # negative = charging, positive = discharging
            grid_kw = c.grid_power       # positive = importing, negative = exporting
            solar_kw = c.solar_production
            mode_desc = c.work_mode_desc
            grid_status = c.grid_status

            # ── 3a. Mode check β€” still in TOU? ──
            if "Time of Use" not in mode_desc:
                print(f"⚠️  Mode changed to '{mode_desc}' β€” expected TOU!")
                print(f"    The system may have switched due to storm hedge or app override.")
                break

            # ── 3b. Grid check β€” still connected? ──
            grid_state = stats.current.grid_connection_state
            if grid_state != GridConnectionState.CONNECTED:
                print(f"⚠️  Grid state: {grid_state.value} β€” cannot charge from grid while not connected!")
                break

            # ── 3c. Power flow direction ──
            charging = bat_kw < -0.05   # Battery drawing > 50W = charging

            if DISPATCH == dispatchCodeType.GRID_CHARGE:
                # Grid charge: expect battery charging (bat_kw < 0) AND grid importing (grid_kw > 0)
                flow_ok = charging
                direction = "⬇ CHARGING" if charging else "⏸ IDLE/DISCHARGING"
                grid_dir = f"grid={'importing' if grid_kw > 0 else 'exporting'} {abs(grid_kw):.2f} kW"

            elif DISPATCH == dispatchCodeType.GRID_EXPORT:
                # Grid export: expect battery discharging (bat_kw > 0) AND grid exporting (grid_kw < 0)
                discharging = bat_kw > 0.05
                flow_ok = discharging
                direction = "⬆ DISCHARGING" if discharging else "⏸ IDLE/CHARGING"
                grid_dir = f"grid={'exporting' if grid_kw < 0 else 'importing'} {abs(grid_kw):.2f} kW"

            else:
                # Other dispatches (SELF, SOLAR, HOME, STANDBY)
                flow_ok = True
                direction = f"bat={bat_kw:+.2f} kW"
                grid_dir = f"grid={grid_kw:+.2f} kW"

            status_icon = "βœ…" if flow_ok else "⚠️"
            print(
                f"  {status_icon} SoC: {soc:5.1f}% | "
                f"Battery: {bat_kw:+6.2f} kW {direction} | "
                f"Solar: {solar_kw:.2f} kW | {grid_dir}"
            )

            # ── 3d. SoC target reached? ──
            if DISPATCH == dispatchCodeType.GRID_CHARGE and soc >= TARGET_SOC:
                print(f"\n🎯 Target SoC reached: {soc:.0f}% β‰₯ {TARGET_SOC:.0f}%")
                break

            if DISPATCH == dispatchCodeType.GRID_EXPORT and soc <= TARGET_SOC:
                # For export, TARGET_SOC is the minimum SoC before stopping
                print(f"\n🎯 Minimum SoC reached: {soc:.0f}% ≀ {TARGET_SOC:.0f}%")
                break

            await asyncio.sleep(POLL_INTERVAL)

    except KeyboardInterrupt:
        print("\n⏹️  Monitoring stopped by user.")

Phase 4: Restore Original State

Always restore the original TOU schedule and operating mode β€” even if monitoring was interrupted.

    # ── 4. Restore original state ──
    print(f"\nπŸ”„ Restoring original state...")

    # 4a. Restore original TOU schedule
    try:
        orig_blocks = original_schedule.get("result", {}).get("detailVoList", [])
        if orig_blocks:
            # Re-submit the original schedule blocks
            restore_schedule = []
            for b in orig_blocks:
                restore_schedule.append({
                    "startTime": b.get("startTime", "0:00"),
                    "endTime": b.get("endTime", "24:00"),
                    "dispatchId": b.get("dispatchId", 6),
                    "waveType": b.get("waveType", 0),
                })
            await client.set_tou_schedule(
                touMode="CUSTOM",
                touSchedule=restore_schedule,
                default_mode="SELF",
            )
            print(f"βœ“ Restored original TOU schedule ({len(orig_blocks)} blocks)")
        else:
            # No blocks = was flat self-consumption
            await client.set_tou_schedule(touMode="SELF")
            print("βœ“ Restored to full-day self-consumption")
    except Exception as e:
        print(f"⚠️  Could not restore TOU schedule: {e}")
        print(f"    You may need to manually restore via the FranklinWH app.")

    # 4b. Restore original operating mode
    if original_mode_id != TIME_OF_USE:
        try:
            await client.set_mode(original_mode_id, None, None, None, None)
            print(f"βœ“ Restored operating mode to {original_mode_name} (workMode={original_mode_id})")
        except Exception as e:
            print(f"⚠️  Could not restore mode: {e}")

    # 4c. Final state confirmation
    await asyncio.sleep(3)
    final_mode = await client.get_mode()
    final_stats = await client.get_stats()
    print(f"\nπŸ“‹ Final state:")
    print(f"   Mode: {final_mode.get('modeName', '?')}")
    print(f"   SoC:  {final_stats.current.soc:.0f}%")
    print(f"   Grid: {final_stats.current.grid_status.name}")
    print("βœ… Done!")

asyncio.run(main())

Expected output (grid charge scenario):

πŸ“‹ Saved state β€” Mode: Self Consumption (workMode=2)
πŸ”‹ Batteries: 1 Γ— aPower = ~13.6 kWh
πŸ”‹ Current SoC: 42%  β†’  Target: 95%
⚑ PCS limits β€” Grid charge: -1 kW, Grid discharge: -1 kW
   (-1 = unlimited, 0 = disabled)
πŸ”„ Switching to TOU mode (was Self Consumption)...
βœ“ Now in TOU mode

⏱️  Setting grid charge window 11:30–15:00...
βœ“ Schedule submitted β€” touId=12345

πŸ“… Active schedule (3 blocks):
  0:00–11:30   Self-consumption  (waveType=0)
  11:30–15:00  Grid charge       (waveType=0)
  15:00–24:00  Self-consumption  (waveType=0)

πŸ” Monitoring every 60s until SoC β‰₯ 95%...

  βœ… SoC:  42.3% | Battery:  -4.80 kW ⬇ CHARGING | Solar: 2.10 kW | grid=importing 2.70 kW
  βœ… SoC:  48.7% | Battery:  -4.90 kW ⬇ CHARGING | Solar: 3.40 kW | grid=importing 1.50 kW
  βœ… SoC:  55.1% | Battery:  -5.00 kW ⬇ CHARGING | Solar: 4.20 kW | grid=importing 0.80 kW
  ...
  βœ… SoC:  94.8% | Battery:  -1.20 kW ⬇ CHARGING | Solar: 3.80 kW | grid=importing 0.00 kW

🎯 Target SoC reached: 95% β‰₯ 95%

πŸ”„ Restoring original state...
βœ“ Restored original TOU schedule (3 blocks)
βœ“ Restored operating mode to Self Consumption (workMode=2)

πŸ“‹ Final state:
   Mode: Self Consumption
   SoC:  95%
   Grid: NORMAL
βœ… Done!

[!CAUTION] This script modifies your live aGate TOU schedule and operating mode. Always test during off-peak hours. The script saves and restores state, but if it crashes mid-execution, restore manually via the FranklinWH app.

See TOU_SCHEDULE_GUIDE.md for dispatch codes, known limitations, and the 30-minute boundary rule.


Smart Circuits & EV Charging

V2 Firmware Mutations & Compatibility

FranklinWH recently migrated Smart Circuit (Sw) payloads from V1 integer timers (Sw1OpenTime/Sw1CloseTime2) to V2 string arrays (time_enabled, time_schedules, time_set). * Impact: Writing schedules using the V1 integer schema will aggressively fail or be ignored by modern aGates. * Reading: franklinwh-cloud transparently handles the parsing into SmartCircuitDetail dataclasses. You will see string representations like '2025-10-04 20:11'. * Writing: Until the exact V2 payload constructor is fully mapped natively, schedules should only be modified manually or toggled dynamically using boolean switches (set_smart_switch_state) and Amperage limits (set_smart_circuit_load_limit).

Advanced Regional Discovery & Renaming

US and AU/EU markets exhibit vastly different Smart Circuit topologies. US grids support multiple aGates chained together and "merged" Smart Circuits, while AU grids standardise on 3 physical outputs per aGate with V2L. You can parse the DeviceSnapshot tree to adapt your integration intelligently:

import asyncio
from franklinwh_cloud import Client

async def diagnose_regional_smart_circuits():
    client = Client("user@example.com", "secret")
    await client.login()
    await client.select_gateway()

    # Discover Tier 2 loads all hardware Quirks and Accessories 
    snapshot = await client.discover(tier=2)
    acc = snapshot.accessories

    # Check if this gateway supports the US "merge" functionality or AU V2L
    quirks = acc.get("gateway", {}).get("region_quirks", {})
    if quirks.get("supports_smart_circuit_merge"):
        print("🌍 US Region Detected: Application may contain merged Smart Circuits spanning multiple aGates.")
    elif quirks.get("supports_v2l"):
        print("🌍 AU Region Detected: Vehicle-to-Load output may occupy Smart Circuit 1.")

    # Detect if a Generator or Smart Circuits are physically installed
    installed = acc.get("installed", [])
    if "Generator" in installed:
        gen_status = acc.get("accessories", {}).get("generator", {}).get("status_desc")
        print(f"⚑ Generator physically wired. Current State: {gen_status}")
    if "Smart_Circuit" in installed:
        count = acc.get("accessories", {}).get("smart_circuits", {}).get("count", 0)
        print(f"πŸ”Œ {count} Smart Circuits physically tracked by this aGate.")

    # Renaming a Smart Circuit requires a raw dictionary invoke as the API is undocumented
    print("πŸ“ Renaming Circuit 2 to 'EV Charger'...")
    circuit_payload = {"swId": 2, "name": "EV Charger"}
    await client._post("/hes-gateway/terminal/updateSmartCircuitName", payload=circuit_payload)

# asyncio.run(diagnose_regional_smart_circuits())

Complex Automation: Adaptive EV Solar Charging

This script tracks Solar PV, SOC, and Home Load exactly as requested: it waits until native Solar generation exceeds a set threshold, confirms no grid export is currently required, checks that the battery is sufficiently high, and optionally reaches out to a Home Assistant WebSocket (like the Enphase integration) to verify EV presence before throwing the FranklinWH Smart Circuit ON.

import asyncio
import json
import websockets # pip install websockets
from franklinwh_cloud import FranklinWHCloud

# Configuration Thresholds
SOLAR_THRESHOLD_KW = 4.0      # Minimum solar generation required to charge EV
MIN_SOC_PERCENT = 80          # Minimum FranklinWH battery level required
EV_CIRCUIT_ID = 2             # The Smart Circuit physical port (1, 2, or 3)
HA_WS_URL = "ws://homeassistant.local:8123/api/websocket"
HA_TOKEN = "eyJhbGciOiJIUzI1..." 

async def check_ev_presence_ha():
    """Reach out to Home Assistant to verify the EV is plugged in and needs charge."""
    try:
        async with websockets.connect(HA_WS_URL) as ws:
            await ws.send(json.dumps({"type": "auth", "access_token": HA_TOKEN}))
            await ws.recv() # Wait for auth_ok

            # Request EV state from a custom HA Enphase/Tesla integration entity
            req_id = 1
            await ws.send(json.dumps({"id": req_id, "type": "get_states"}))
            response = json.loads(await ws.recv())

            for entity in response.get("result", []):
                if entity["entity_id"] == "sensor.ev_charger_status":
                    return entity["state"] == "plugged_in"
    except Exception as e:
        print(f"HA WebSocket unreachable: {e}. Defaulting to True for safety.")
    return True

async def adaptive_ev_charging_loop():
    client = FranklinWHCloud("user@example.com", "secret")

    while True:
        try:
            if not client.is_authenticated():
                await client.login()
                await client.select_gateway()

            stats = await client.get_stats()
            solar_kw = stats.current.solar_production
            grid_kw = stats.current.grid_power
            soc = stats.current.soc

            # Check if Smart Circuit is already ON
            circuits = await client.get_smart_circuits_info()
            ev_circuit_active = circuits.get(EV_CIRCUIT_ID).is_on

            # Criteria 1: Plenty of solar headroom
            # Criteria 2: Battery SOC is healthy
            # Criteria 3: Grid export is negative (we possess excess power not consumed by home)
            solar_sufficient = solar_kw >= SOLAR_THRESHOLD_KW
            battery_sufficient = soc >= MIN_SOC_PERCENT
            excess_power_available = grid_kw < 0 

            if solar_sufficient and battery_sufficient and excess_power_available:
                if not ev_circuit_active:
                    # Optional: Verify the EV is physically connected via Home Assistant
                    ev_plugged_in = await check_ev_presence_ha()
                    if ev_plugged_in:
                        print(f"Criteria Met (Solar: {solar_kw}kW, SOC: {soc}%, Grid: {grid_kw}kW). Activating EV Circuit!")
                        # Toggle the specific Smart Circuit ON dynamically
                        switches = [None, None, None]
                        switches[EV_CIRCUIT_ID - 1] = True 
                        await client.set_smart_switch_state(tuple(switches))

            # Hysteresis / Shutdown Logic 
            # If clouds roll in and we start heavily draining from the grid, cut the charger
            elif grid_kw > 1.0 and ev_circuit_active:
                print("Solar dropped or grid demand spiked. Deactivating EV circuit.")
                switches = [None, None, None]
                switches[EV_CIRCUIT_ID - 1] = False
                await client.set_smart_switch_state(tuple(switches))

        except Exception as e:
            print(f"Polling fault: {e}")

        await asyncio.sleep(60) # Poll every 60 seconds

# asyncio.run(adaptive_ev_charging_loop())

Storm Hedge

Real-Time Weather Event Polling

The FranklinWH aGate constantly polls national weather services indicating incoming storm cells. You can proactively poll these internal lists to hook into Home Assistant automations triggering shutters or pre-chilling HVAC systems.

async def poll_weather_events():
    client = FranklinWHCloud("user@example.com", "secret")
    await client.login()
    await client.select_gateway()

    # Get active storm warnings tracked by the aGate
    active_storms = await client.get_progressing_storm_list()
    if active_storms:
        for storm in active_storms:
            print(f"🚨 Storm Detected: {storm.get('title')}")
            print(f"   Severity: {storm.get('severity')}")
            print(f"   Time: {storm.get('effective')} -> {storm.get('expires')}")
    else:
        print("β˜€οΈ No severe weather events tracked by FranklinWH.")

    # Check if Storm Hedge is actively protecting the battery
    storm_settings = await client.get_storm_settings()
    if storm_settings.get("switchStatus") == 1:
        print(f"Storm Hedge Enabled! Reserve SOC protected at {storm_settings.get('backUpSoc')}%")

Dispatch Code Reference

Code dispatchCodeType Enum Description
1 HOME / HOME_LOADS aPower to home (surplus solar to grid)
2 STANDBY aPower on standby (surplus solar to grid)
3 SOLAR / SOLAR_CHARGE aPower charges from solar
6 SELF / SELF_CONSUMPTION Self-consumption (surplus solar to grid)
7 GRID_EXPORT / GRID_DISCHARGE / FORCE_DISCHARGE aPower to home/grid
8 GRID_CHARGE / GRID_IMPORT / FORCE_CHARGE aPower charges from solar/grid

Method Count Summary

Mixin Methods Category
discover.py 1 Device discovery (3-tier survey)
stats.py 4 Power flow, runtime data
modes.py 4 Operating mode control
tou.py 17 TOU schedule + tariff management
power.py 5 Grid status, PCS settings
devices.py 16 Hardware, BMS, smart circuits, LED, generator
storm.py 5 Weather, Storm Hedge
account.py 18 Account, notifications, alarms, AI
Total 70

Exception Handling & Reliability

FranklinWH Cloud infrastructure can occasionally experience timeouts, offline gateways, or session invalidation. Ensure your integration scripts are wrapped in the library's strictly defined exception structures.

import asyncio
from franklinwh_cloud import FranklinWHCloud
from franklinwh_cloud.exceptions import (
    FranklinWHTimeoutError,
    DeviceTimeoutException,
    GatewayOfflineException,
    TokenExpiredException
)

async def reliable_polling_loop():
    client = FranklinWHCloud("user@example.com", "secret")

    while True:
        try:
            if not client.is_authenticated():
                await client.login()
                await client.select_gateway()

            # Perform routine queries
            snapshot = await client.get_stats()
            print(f"Current SoC: {snapshot.current.soc}%")

        except FranklinWHTimeoutError as e:
            # Raised globally when httpx or socket drops the connection natively
            print(f"Cloud API timed out: {e}. Retrying in 60s...")

        except DeviceTimeoutException as e:
            # Raised when the Cloud API is online, but your physical aGate stopped communicating
            print(f"Your Gateway dropped offline from the mesh: {e}")

        except GatewayOfflineException as e:
            # Raised explicitly when attempting a WRITE command to a known offline box
            print(f"WRITE blocked. Gateway disconnected: {e}")

        except TokenExpiredException as e:
            # Raised when the JWT natively expires
            print("Session dead. Flagging rotation for next loop...")
            client.clear_token()

        except Exception as e:
            # Generic catch-all for parsing failures
            print(f"Unexpected fault: {e}")

        await asyncio.sleep(60)

Key Exception Hierarchy

All library-specific exceptions inherit from FranklinWHError. Below are the primary failure modes:

Exception Class Cause Resolution
FranklinWHTimeoutError Raw API unresponsive / Connection reset Standard retry backoff.
DeviceTimeoutException Node-to-Cloud telemetry lost (offlineReason triggered). Investigate Edge WiFi or wait for 4G cellular failover.
TokenExpiredException JWT session rotation required. Invoke client.login() or client.get_token() to renew.
InvalidCredentialsException 401 Unauthorized (Code 10009). Verify username/password or LOGIN_TYPE flag.
BadRequestParsingError JSON blob abruptly mutated (V1 to V2). Ensure dependencies are tracking the latest PyPI distribution.

Raw Cloud Endpoints vs Pythonic Aliases

The franklinwh-cloud library internally negotiates with a series of undocumented REST and MQTT endpoints. To keep abstractions clean, it provides simplified snake_case wrappers (e.g. get_device_info(), get_stats()) around these raw endpoints.

If you are inspecting or reverse-engineering the native Cloud API, you can hit the raw endpoints directly via client._get() and client._post(). This bypasses the dataclass parsing and returns the raw JSON.

# The Pythonic Wrapper (Recommended)
# Safely parses the response, resolves typologies, and returns a Stats dataclass
stats = await client.get_stats()

# The Raw Cloud API equivalent (For reverse-engineering)
# Using the internal protected method to hit the endpoint exactly as the mobile app does.
url = "https://energy.franklinwh.com/hes-gateway/terminal/getDeviceCompositeInfo"
# Must dynamically include gateway ID for most terminal operations
raw_json = await client._get(url, params={"gatewayId": client.gateway})
print(f"Raw Firmware string: {raw_json['result']['runtimeData']['fhpVersions']}")

Extracting CloudFront Edge Failovers (Log Analysis)

The library actively tracks CloudFront PoP (Point of Presence) edge routing nodes by automatically reading the x-amz-cf-pop headers returned by AWS. A transition event is raised when AWS dynamically shifts your active internet session to a different geographical edge location (for example: SYD62-P1 β†’ MEL50-C1).

The edge tracker mechanism automatically emits a standard application WARN log whenever your active session shifts. You can extract these failover events directly from your historical application logs (e.g., if you pipe them via standard out or text logging files) using basic log analysis.

Example Command (grep):

# Search historical logs for edge failover warnings
grep "CloudFront edge transition" app.log

# Expected Terminal Output:
# [WARN] ☁️ CloudFront edge transition: SYD62-P1 β†’ MEL50-C1
# [WARN] ☁️ CloudFront edge transition: MEL50-C1 β†’ SYD62-P1

If you wish to view your active routing map natively inside your Python code via Edge Tracker API logic, you can generate a snapshot:

edge_data = client.get_metrics()['edge'] # or via client.get_edge_tracker().snapshot() if exposed
if getattr(client, "get_metrics", None):
    # Depending on client abstraction layer
    pass

Wave Type (Pricing Tier) Reference

Code WaveType Enum Description
0 OFF_PEAK Off-peak pricing tier
1 MID_PEAK Mid-peak pricing tier
2 ON_PEAK On-peak pricing tier
4 SUPER_OFF_PEAK Super off-peak pricing tier

Work Mode Reference

Code workModeType Enum Constant Description
1 TIME_OF_USE TIME_OF_USE TOU dispatch schedule controls the battery
2 SELF_CONSUMPTION SELF_CONSUMPTION Maximise solar self-use
3 EMERGENCY_BACKUP EMERGENCY_BACKUP Full battery backup

Import: from franklinwh_cloud.const import TIME_OF_USE, SELF_CONSUMPTION, EMERGENCY_BACKUP

Full details in TOU_SCHEDULE_GUIDE.md


🩺 TOU Health Check & Recovery

The FranklinWH Cloud API has no native endpoint to confirm whether a gateway is physically executing its programmed TOU schedule. A gateway can be in TOU mode but stuck in standby β€” the cloud reports the schedule as active, but the battery does nothing. This section documents the library's diagnostic and recovery tooling built to detect and resolve this condition.

[!IMPORTANT] These methods make real API calls and reset_tou_mode() writes to the gateway. Call them sparingly and on-demand only β€” never in a polling loop. For multi-gateway accounts, always scope calls to the specific gateway that has the suspected fault. Do not call health checks across all gateways on every poll.


Background: touSendStatus Field

getGatewayTouListV2 (called internally by get_tou_health()) returns two sync indicator fields alongside the schedule. These are the only confirmed values, observed across 1,253 HTTP Toolkit HAR samples:

touSendStatus touAlertMessage Count Meaning
null null 1,141 βœ… Settled β€” gateway has the current schedule, no sync pending
2 "Package settings are taking effect" 25 πŸ”„ In-progress β€” schedule sent, gateway applying it now
3 "Package settings synchronization of failed, please check after setting again" 87 ⚠️ Sync alert β€” cloud did not receive gateway ACK

[!NOTE] touSendStatus=3 is not always a real failure. The gateway may have applied the schedule to its local DB but the ACK response was lost (e.g. brief internet drop during sync). The mobile app surfaces this as a modal dialog and allows the user to "Apply Again". Treat it as a warning, not a hard fault β€” cross-reference with run_status before acting.


get_tou_health() β€” Read-Only Diagnostic

Mixin: TouMixin | Calls: getGatewayTouListV2 + get_tou_info(1) + live stats

Returns a single verdict dict describing whether the gateway is correctly executing the active TOU schedule block for the current time.

health = await client.get_tou_health(live_stats=last_stats_dict)

Parameters:

Parameter Type Default Description
live_stats dict \| None None Pass the last polled stats dict to avoid an extra API call. If None, the method calls get_stats() internally.

Return dict:

Key Type Description
ok bool True only when health_status == "HEALTHY"
health_status str "HEALTHY" / "DEGRADED" / "FAULT"
active_mode str Raw mode name from the API
is_tou_mode bool Whether the gateway is currently in TOU mode
sync_status str "SYNC_OK" / "SYNC_PENDING" / "SYNC_ALERT"
tou_send_status int \| None Raw touSendStatus value
tou_alert_message str Raw touAlertMessage value (empty string if null)
run_status int \| None Raw run_status (0=Standby, 1=Charging, 2=Discharging)
run_status_label str Human label: "Standby" / "Charging" / "Discharging"
active_block dict \| None The schedule block that should be executing right now
expected_dispatch str \| None What the schedule says the battery should be doing
fault_reasons list[str] Non-empty list of reasons when DEGRADED or FAULT

Health verdict rules:

Condition Verdict
Not in TOU mode FAULT
No active schedule block for current time DEGRADED
run_status matches expected dispatch HEALTHY
run_status is Standby when dispatch expects action FAULT
run_status is wrong direction DEGRADED
touSendStatus=3 (SYNC_ALERT), any verdict Caps to DEGRADED minimum
touSendStatus=2 (SYNC_PENDING), otherwise HEALTHY Caps to DEGRADED

When to call it:

# βœ… CORRECT β€” on-demand, triggered by a user action or scheduled alert
if user_clicked_health_check_button:
    health = await client.get_tou_health(live_stats=cached_stats)

# βœ… CORRECT β€” once per poll cycle, for the specific gateway showing symptoms
# (e.g. run_status=0 during an expected discharge window)
if run_status == 0 and expected_dispatch in ("GRID_EXPORT", "GRID_CHARGE"):
    health = await client.get_tou_health(live_stats=current_stats)

# ❌ WRONG β€” never call in a tight loop or across every gateway on every poll
for gw in all_gateways:
    health = await client.get_tou_health()   # adds 2-3 extra API calls per gateway

reset_tou_mode() β€” Recovery (Write Operation)

Mixin: DevicesMixin | Calls: set_mode() Γ— 2 + get_gateway_tou_list() Γ— N

Performs a controlled Self-Consumption β†’ Time-of-Use mode toggle. This causes the gateway firmware to re-read its local TOU schedule database from scratch, resolving the "stuck in standby" condition. After the toggle, polls getGatewayTouListV2 at 15-second intervals to confirm touSendStatus returns to null.

[!CAUTION] This is a write operation that temporarily changes the operating mode. It MUST only be called after presenting the fault to the user and receiving explicit confirmation. Never call automatically. The FHAI endpoint requires {"confirmed": true} in the POST body for this reason.

result = await client.reset_tou_mode(
    min_soc_pct=10,           # Default: 10% β€” SOC guard, rejects reset if battery too low
    max_verify_attempts=4,    # Default: 4 β€” max polls for gateway ACK
    verify_interval_s=15,     # Default: 15s β€” delay between polls (mirrors mobile app)
)

Parameters:

Parameter Type Default Description
min_soc_pct int 10 Minimum battery SOC% before allowing the reset. Prevents accidental discharge on a critically low battery.
max_verify_attempts int 4 Maximum number of getGatewayTouListV2 polls (4 Γ— 15s = 60s total window).
verify_interval_s int 15 Seconds between verification polls. Matches the mobile app's observed post-apply polling cadence.

Return dict:

Key Type Description
ok bool True if the mode toggle succeeded (regardless of sync ACK)
sync_cleared bool True if touSendStatus returned to null within the retry window
final_send_status int \| None Last observed touSendStatus after all retries
final_alert_message str Last observed touAlertMessage (empty string if null)
steps list[str] Full step-by-step log of every action taken
error str \| None Set only on fatal failure (SOC guard rejected, mode switch failed)

ok vs sync_cleared:

  • ok=True, sync_cleared=True β†’ Full success. Reset applied, gateway confirmed.
  • ok=True, sync_cleared=False β†’ Reset applied but no ACK in the retry window. May be a false positive. Monitor run_status to verify physical execution.
  • ok=False β†’ Fatal: SOC guard rejected the reset, or a mode switch API call failed. Check error for the reason.

Full Usage Example

import asyncio
from franklinwh_cloud import FranklinWHCloud

async def check_and_recover_tou():
    client = FranklinWHCloud.from_config("franklinwh.ini")
    await client.login()
    await client.select_gateway()

    # ── Step 1: Get current stats once (reuse for health check) ──
    stats = await client.get_stats()
    live = {
        "battery_soc": stats.current.soc,
        "run_status":  stats.current.run_status,
        "grid_power":  stats.current.grid_power,
        "battery_power": stats.current.battery_power,
    }

    # ── Step 2: Run health check (on-demand only) ──
    health = await client.get_tou_health(live_stats=live)

    print(f"TOU Health: {health['health_status']}")
    print(f"  Sync:     {health['sync_status']} (touSendStatus={health['tou_send_status']})")
    print(f"  Mode:     {health['active_mode']}  (TOU={health['is_tou_mode']})")
    print(f"  Battery:  {health['run_status_label']} (expected: {health['expected_dispatch']})")

    if health['fault_reasons']:
        print("  Faults:")
        for r in health['fault_reasons']:
            print(f"    β€’ {r}")

    if health['ok']:
        print("βœ… Gateway is executing schedule correctly.")
        return

    # ── Step 3: Gate the recovery behind explicit confirmation ──
    print(f"\n⚠️  Health verdict: {health['health_status']}")
    confirm = input("Run TOU mode reset? This will toggle Self-Consumption→TOU. (yes/no): ")
    if confirm.strip().lower() != "yes":
        print("Reset cancelled.")
        return

    # ── Step 4: Execute recovery ──
    print("\nπŸ”„ Running TOU mode reset...")
    result = await client.reset_tou_mode(
        min_soc_pct=10,
        max_verify_attempts=4,   # up to 60 s of polling
        verify_interval_s=15,    # 15 s between polls (mobile app cadence)
    )

    print("\nReset steps:")
    for step in result['steps']:
        print(f"  {step}")

    if not result['ok']:
        print(f"\n❌ Reset failed: {result['error']}")
        return

    if result['sync_cleared']:
        print(f"\nβœ… Reset complete β€” gateway ACK confirmed (touSendStatus=null).")
    else:
        print(
            f"\n⚠️  Reset sent but sync unconfirmed "
            f"(touSendStatus={result['final_send_status']}). "
            f"This may be a false positive. Monitor run_status to verify."
        )

    # ── Step 5: Recheck health after recovery ──
    await asyncio.sleep(5)
    stats2 = await client.get_stats()
    live2  = {"run_status": stats2.current.run_status}
    health2 = await client.get_tou_health(live_stats=live2)
    print(f"\nπŸ“‹ Post-reset health: {health2['health_status']} β€” battery={health2['run_status_label']}")

asyncio.run(check_and_recover_tou())

Expected output β€” stuck gateway recovered:

TOU Health: FAULT
  Sync:     SYNC_OK (touSendStatus=None)
  Mode:     Ausgrid EA11 TOU  (TOU=True)
  Battery:  Standby (expected: GRID_EXPORT)
  Faults:
    β€’ Gateway in TOU mode but run_status=Standby during a GRID_EXPORT block.
      Expected: battery discharging to grid.

⚠️  Health verdict: FAULT
Run TOU mode reset? This will toggle Self-Consumption→TOU. (yes/no): yes

πŸ”„ Running TOU mode reset...

Reset steps:
  SOC guard passed: battery at 72%.
  Step 1: set_mode(Self-Consumption) β†’ {'code': 200, ...}
  Step 2: 3 s pause complete.
  Step 3: set_mode(Time-of-Use) sent β€” polling for gateway ACK.
  Step 4 attempt 1/4: waiting 15 s…
  Step 4 attempt 1: touSendStatus=2 (still pending/failed). Retrying…
  Step 4 attempt 2/4: waiting 15 s…
  Step 4 attempt 2: touSendStatus=null β€” gateway ACK received, sync confirmed.
  TOU mode reset complete β€” schedule sync confirmed.

βœ… Reset complete β€” gateway ACK confirmed (touSendStatus=null).

πŸ“‹ Post-reset health: HEALTHY β€” battery=Discharging


Multi-Gateway Accounts β€” Scoping Rules

[!WARNING] get_tou_health() and reset_tou_mode() operate on the currently selected gateway (client.gateway). For multi-gateway accounts, you must select the correct gateway before calling these methods.

# βœ… CORRECT β€” select before calling
await client.select_gateway(gateway_id="10060006A0XXXXXXXXXX")
health = await client.get_tou_health()

# ❌ WRONG β€” never loop health checks across all gateways in a single cycle
# This is 3–5 extra API calls per gateway per poll. On a 4-gateway account
# that runs every 60s, this is 12–20 extra calls per minute unnecessarily.
for gw_id in all_gateway_ids:
    await client.select_gateway(gateway_id=gw_id)
    health = await client.get_tou_health()   # ← do not do this

Only run the health check for a specific gateway when there is a concrete signal that it may be stuck β€” e.g. run_status=0 (Standby) during a time window where the schedule dictates charging or discharging.