Finnest Data Architecture¶
Date: 2026-04-16
Status: Draft
Scope: Data model, query patterns, tenant enforcement, event store, migration approach. Expands on the summary in architecture.md Part 5.
Related documents: architecture.md (main), ../brainstorms/brainstorm-04-data-model-migration.md, ../brainstorms/brainstorm-05-multi-industry-design.md, ../brainstorms/brainstorm-11-traffio-laravel-migration-naming.md.
Schema Topology¶
One PostgreSQL cluster. 22 schemas — 19 domain + events + agents + public.
| Schema | Tables | Tier | Key Entities |
|---|---|---|---|
public |
~10 | Core | organisations, users, offices, feature_flags, audit_log, notifications, tags, id_mappings |
people |
~10 | 1 | employees, employee_qualifications, leave_requests, leave_balances, contracts, unavailability |
recruit |
~12 | 1 | job_orders, candidate_pools, candidate_assessments, scoring_results, outreach_log, talent_pools, drip_campaigns |
onboard |
~14 | 1 | onboarding_profiles, document_verifications, verifiable_documents, verification_stages, extracted_fields, identity_checklists, tfn_declarations, super_choices, bank_details, pipeline_steps |
roster |
~12 | 1 | shifts, shift_templates, shift_assignments, shift_codes, roster_rules, availability, demand_forecasts, shift_swaps, shift_bids, block_lists |
timekeep |
~10 | 1 | timecards, timecard_entries, timecard_allowances, clock_events, terminals, geo_locations, breaks, journals, dockets |
reach |
~8 | 1 | conversations, messages, message_deliveries, channels, templates, handoff_records, news_feed, announcements |
pulse |
~7 | 1 | automation_runs, automation_results, optimisation_runs, anomaly_detections, predictive_models, dashboards |
payroll |
~15 | 2 | pay_runs, pay_lines, tax_calculations, super_calculations, deductions, stp_submissions, invoices, payments, rate_cards |
clients |
~10 | 2 | clients, contacts, sites, agencies, rate_cards, rate_configurations, job_costings, client_hierarchies, opportunities |
safety |
~12 | 2 | incidents, investigations, hazards, risk_assessments, inspections, swms_documents, corrective_actions, drug_alcohol_tests, custom_forms |
assets |
~10 | 2 | equipment, equipment_assignments, maintenance_schedules, work_orders, fleet_vehicles, prestart_checklists |
quotes |
~8 | 3 | leads, quotes, jobs, tasks, milestones, agreements, signatures |
learn |
~8 | 3 | courses, tutors, enrollments, progress, certifications, learning_paths, reinduction_schedules |
benefits |
~6 | 3 | programs, enrollments, perks, recognitions, rewards_points, ewa_transactions |
compliance |
~12 | 4 | credential_types, industry_profiles, awards, award_classifications, award_rates, compliance_rules, ongoing_monitors, compliance_scores, labour_hire_licensing, dvs_checks, pep_screening |
fatigue |
~6 | 4 | fatigue_rules, driver_hours, rest_periods, ewd_entries, fitness_assessments |
clearance |
~5 | 4 | clearance_applications, clearance_statuses, vetting_records, program_separations |
performance |
~10 | 5 | goals, reviews, feedback, surveys, enps_scores, calibrations, development_plans, skills_matrix |
events |
1 | Infra | domain_events (partitioned by month) |
agents |
~5 | Infra | sessions, messages, memories, budget_limits, tool_audit |
| Total | ~195 |
Design Principles (non-negotiable)¶
From B04 + Commandment #16 + DA-* guardrails:
- Every table has
org_id— exceptpublic.organisationsitself (DA-11) - Every table has
inserted_at,updated_at,deleted_at— soft delete via timestamp (DA-12) - Every primary key is a UUID (
gen_random_uuid()) — no auto-increment (DA-13) - JSONB for industry-specific extension fields — the
metadatacolumn pattern - Typed Ecto embeds for structured JSON — validate at application level
- No cross-schema JOINs for operational queries (AR-07) — events instead
- Reference-data schemas readable cross-schema —
compliance.credential_types,.industry_profiles,.awardsonly - PII encrypted at rest — TFN, bank, tax, passport, MFA secrets via Cloak AES-256-GCM (SE-13)
- Leading
org_idon every operational index — tenant-first - Partial indexes
WHERE deleted_at IS NULLfor active-record queries - All reversible migrations (CQ-11)
- UTF-8 encoding,
en_AU.UTF-8collation (DA-05)
Core Entities¶
public.organisations — tenant root¶
CREATE TABLE public.organisations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR NOT NULL,
slug VARCHAR UNIQUE NOT NULL,
abn VARCHAR, -- Australian Business Number
industry_profile_id UUID REFERENCES compliance.industry_profiles(id),
subscription_tier VARCHAR NOT NULL DEFAULT 'tier1',
settings JSONB NOT NULL DEFAULT '{}', -- ip_allowlist, agent_action_mode, etc.
irap_classified BOOLEAN NOT NULL DEFAULT false, -- routing/enforcement flag
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
CREATE UNIQUE INDEX idx_organisations_slug ON public.organisations(slug) WHERE deleted_at IS NULL;
CREATE INDEX idx_organisations_abn ON public.organisations(abn) WHERE abn IS NOT NULL;
public.users — merged from v2's three user tables (B04)¶
CREATE TABLE public.users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL REFERENCES public.organisations(id),
email VARCHAR NOT NULL,
email_citext CITEXT GENERATED ALWAYS AS (LOWER(email)) STORED,
hashed_password VARCHAR NOT NULL, -- Argon2
role VARCHAR NOT NULL, -- director|admin|staff|payroll|supervisor|worker|client_contact
mfa_enabled BOOLEAN NOT NULL DEFAULT false,
mfa_method VARCHAR, -- fido2|totp (totp only in commercial for non-admin)
mfa_secret_encrypted BYTEA, -- AES-256-GCM via Cloak
fido2_credentials JSONB DEFAULT '[]', -- registered public keys
last_login_at TIMESTAMPTZ,
last_login_ip INET,
session_timeout_seconds INTEGER, -- null = use config default
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
CREATE UNIQUE INDEX idx_users_org_email ON public.users(org_id, email_citext) WHERE deleted_at IS NULL;
CREATE INDEX idx_users_role ON public.users(org_id, role) WHERE deleted_at IS NULL;
public.id_mappings — migration backbone (B04 Insight 2)¶
Populated by migration workers during Strangler Fig; essential for cross-referencing v2 data to Finnest UUIDs.
CREATE TABLE public.id_mappings (
v2_database VARCHAR NOT NULL, -- 'admin_central' | 'admin_atslive' | 'actatek'
v2_table VARCHAR NOT NULL,
v2_id BIGINT NOT NULL,
finnest_schema VARCHAR NOT NULL,
finnest_table VARCHAR NOT NULL,
finnest_id UUID NOT NULL,
org_id UUID NOT NULL REFERENCES public.organisations(id),
migrated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
migration_batch VARCHAR, -- which migration run produced this row
PRIMARY KEY (v2_database, v2_table, v2_id)
);
CREATE INDEX idx_id_mappings_finnest ON public.id_mappings(finnest_schema, finnest_table, finnest_id);
CREATE INDEX idx_id_mappings_org ON public.id_mappings(org_id);
people.employees — single-record lifecycle (B12 H1)¶
CREATE TABLE people.employees (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL REFERENCES public.organisations(id),
user_id UUID REFERENCES public.users(id), -- nullable (not all employees are users)
employee_number VARCHAR,
first_name VARCHAR NOT NULL,
last_name VARCHAR NOT NULL,
preferred_name VARCHAR,
date_of_birth DATE,
gender VARCHAR,
email VARCHAR,
phone VARCHAR,
address JSONB,
emergency_contact JSONB,
tax_file_number_encrypted BYTEA, -- Cloak AES-256-GCM (SE-13)
employment_type VARCHAR NOT NULL, -- casual|part_time|full_time|contractor
employment_status VARCHAR NOT NULL DEFAULT 'onboarding',
start_date DATE,
termination_date DATE,
primary_office_id UUID REFERENCES public.offices(id),
primary_client_id UUID, -- refs clients.clients (cross-schema; read-only)
metadata JSONB NOT NULL DEFAULT '{}', -- industry-specific extended fields
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
CREATE INDEX idx_employees_org ON people.employees(org_id);
CREATE INDEX idx_employees_active ON people.employees(org_id, employment_status) WHERE deleted_at IS NULL;
CREATE INDEX idx_employees_name ON people.employees(org_id, last_name, first_name) WHERE deleted_at IS NULL;
CREATE INDEX idx_employees_user ON people.employees(user_id) WHERE user_id IS NOT NULL;
CREATE INDEX idx_employees_number ON people.employees(org_id, employee_number) WHERE employee_number IS NOT NULL;
compliance.credential_types — hierarchical credential registry (B05)¶
CREATE TABLE compliance.credential_types (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
parent_id UUID REFERENCES compliance.credential_types(id),
code VARCHAR UNIQUE NOT NULL, -- e.g., "hrwl_forklift"
name VARCHAR NOT NULL,
category VARCHAR NOT NULL, -- high_risk_work|construction|mining|...
nationally_recognized BOOLEAN NOT NULL DEFAULT false,
issuing_jurisdictions JSONB NOT NULL DEFAULT '[]', -- ["NSW","VIC",...]
mutual_recognition BOOLEAN NOT NULL DEFAULT true,
renewal_period_months INTEGER, -- NULL = no expiry
prerequisites JSONB NOT NULL DEFAULT '[]', -- [credential_type_ids]
industries JSONB NOT NULL DEFAULT '[]', -- ["construction","mining",...]
verification_method VARCHAR NOT NULL, -- document|api|manual|self_declared
document_templates JSONB NOT NULL DEFAULT '{}',
metadata JSONB NOT NULL DEFAULT '{}',
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
-- Seeded with ~100+ credential types across all industries (B05 seed list)
Note: This table is a reference-data exception to AR-07 — readable cross-schema by 8+ domains. No org_id because credentials are platform-wide facts (a "forklift license" is the same everywhere).
compliance.industry_profiles — composable per-org (B05)¶
CREATE TABLE compliance.industry_profiles (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
slug VARCHAR UNIQUE NOT NULL,
display_name VARCHAR NOT NULL,
description TEXT,
required_credential_types JSONB NOT NULL DEFAULT '[]', -- credential_type_ids
recommended_credential_types JSONB NOT NULL DEFAULT '[]',
enabled_modules JSONB NOT NULL DEFAULT '[]', -- ["safety","fatigue","clearance"]
onboarding_steps JSONB NOT NULL DEFAULT '[]',
compliance_rules JSONB NOT NULL DEFAULT '{}',
terminology JSONB NOT NULL DEFAULT '{}', -- {"worker":"operative","shift":"deployment"}
dashboard_config JSONB NOT NULL DEFAULT '{}',
report_templates JSONB NOT NULL DEFAULT '{}',
metadata JSONB NOT NULL DEFAULT '{}',
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
CREATE TABLE compliance.org_industry_profiles (
org_id UUID NOT NULL REFERENCES public.organisations(id),
industry_profile_id UUID NOT NULL REFERENCES compliance.industry_profiles(id),
activated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
custom_overrides JSONB NOT NULL DEFAULT '{}',
PRIMARY KEY (org_id, industry_profile_id)
);
Ten profiles seeded by end of Phase 3: labour_hire, construction, mining, logistics, defence, retail, white_collar, traffic, civil, engineering. Composable — an org activates multiple profiles; requirements union.
Event Store (the load-bearing table)¶
CREATE SCHEMA events;
CREATE TABLE events.domain_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
domain VARCHAR(50) NOT NULL, -- 'roster', 'payroll'
event_type VARCHAR(100) NOT NULL, -- 'shift_assigned', 'pay_run_finalised'
aggregate_id UUID, -- entity the event belongs to
org_id UUID NOT NULL, -- tenant isolation
payload JSONB NOT NULL, -- event data (schema-versioned)
metadata JSONB NOT NULL DEFAULT '{}', -- correlation_id, causation_id, actor_id,
-- agent_id, classification, schema_version
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
prev_hash BYTEA, -- tamper-evident hash chain
hash BYTEA NOT NULL -- sha256(prev_hash || id || org_id || ...)
) PARTITION BY RANGE (inserted_at);
-- Monthly partitions (DA-14) — ensures query locality and retention management
CREATE TABLE events.domain_events_2026_04 PARTITION OF events.domain_events
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
CREATE TABLE events.domain_events_2026_05 PARTITION OF events.domain_events
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
-- ... (monthly partition creation runs as scheduled Oban job)
-- Indexes (per partition, inherited)
CREATE INDEX idx_events_org_domain_type ON events.domain_events (org_id, domain, event_type, inserted_at DESC);
CREATE INDEX idx_events_aggregate ON events.domain_events (aggregate_id) WHERE aggregate_id IS NOT NULL;
CREATE INDEX idx_events_correlation ON events.domain_events ((metadata->>'correlation_id'));
-- Immutability trigger (AR-17)
CREATE OR REPLACE FUNCTION events.prevent_modification()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Events are immutable. UPDATE and DELETE are not allowed.';
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER no_modify_events
BEFORE UPDATE OR DELETE ON events.domain_events
FOR EACH ROW EXECUTE FUNCTION events.prevent_modification();
-- Hash chain on insert (SE-21, IR-10)
CREATE OR REPLACE FUNCTION events.compute_hash()
RETURNS TRIGGER AS $$
DECLARE
last_hash BYTEA;
hash_input TEXT;
BEGIN
-- Get the most recent hash for this org (chain per-org)
SELECT hash INTO last_hash
FROM events.domain_events
WHERE org_id = NEW.org_id
ORDER BY inserted_at DESC
LIMIT 1;
NEW.prev_hash := last_hash;
hash_input := COALESCE(encode(last_hash, 'hex'), '') ||
NEW.id::text ||
NEW.org_id::text ||
NEW.domain ||
NEW.event_type ||
COALESCE(NEW.aggregate_id::text, '') ||
NEW.payload::text ||
NEW.inserted_at::text;
NEW.hash := digest(hash_input, 'sha256');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER hash_chain_on_insert
BEFORE INSERT ON events.domain_events
FOR EACH ROW EXECUTE FUNCTION events.compute_hash();
Three responsibilities, one table (B02 Insight 3):
- Cross-domain event bus — Phoenix.PubSub broadcasts on insert (via Postgres LISTEN/NOTIFY or Elixir broadcast triggered by
EventStore.append/1) - IRAP-compliant audit log — append-only + immutable + hash-chained satisfies 7-year retention and tamper-evidence (IR-10, SE-21)
- Data flywheel — historical events enable anomaly detection, agent memory L3, industry intelligence aggregations
Hash chain verification (monthly script):
defmodule FinnestCore.EventStore.Integrity do
@doc "Walks the chain for a given org, asserting integrity. Returns :ok or {:error, breaks_at_event_id}."
def verify(org_id) do
events =
from(e in "events.domain_events",
where: e.org_id == ^org_id,
order_by: [asc: e.inserted_at],
select: %{id: e.id, prev_hash: e.prev_hash, hash: e.hash, payload: e.payload, ...}
)
|> Repo.all()
Enum.reduce_while(events, {:ok, nil}, fn event, {:ok, last_hash} ->
expected_prev = last_hash
if event.prev_hash == expected_prev and recompute_hash(event) == event.hash do
{:cont, {:ok, event.hash}}
else
{:halt, {:error, event.id}}
end
end)
end
end
Agents Schema¶
CREATE SCHEMA agents;
CREATE TABLE agents.sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL REFERENCES public.organisations(id),
user_id UUID NOT NULL REFERENCES public.users(id),
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ended_at TIMESTAMPTZ,
metadata JSONB NOT NULL DEFAULT '{}',
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE agents.messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES agents.sessions(id),
org_id UUID NOT NULL,
role VARCHAR NOT NULL, -- user|assistant|tool|system
content TEXT, -- user/assistant message content (retention per classification)
tool_calls JSONB, -- array of tool invocations made by this message
tokens INTEGER, -- count for assistant messages
latency_ms INTEGER, -- time to generate
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE agents.memories (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
level VARCHAR NOT NULL, -- 'l2_tenant' | 'l3_platform'
org_id UUID, -- NULL for L3 (platform-wide anonymised)
scope VARCHAR, -- e.g. 'client:<id>' or 'industry:mining'
memory_type VARCHAR NOT NULL, -- preference|pattern|fact
content JSONB NOT NULL,
embedding VECTOR(1536), -- pgvector (future phase)
source JSONB, -- {session_id, correlation_id, agent_id, confirmed_by}
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_used_at TIMESTAMPTZ,
use_count INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX idx_memories_org_scope ON agents.memories(org_id, scope) WHERE level = 'l2_tenant';
CREATE INDEX idx_memories_platform_scope ON agents.memories(scope) WHERE level = 'l3_platform';
-- pgvector index added in future phase:
-- CREATE INDEX idx_memories_embedding ON agents.memories USING ivfflat (embedding vector_cosine_ops);
CREATE TABLE agents.budget_limits (
org_id UUID PRIMARY KEY REFERENCES public.organisations(id),
daily_limit DECIMAL(10,2) NOT NULL,
weekly_limit DECIMAL(10,2) NOT NULL,
monthly_limit DECIMAL(10,2) NOT NULL,
daily_used DECIMAL(10,2) NOT NULL DEFAULT 0,
weekly_used DECIMAL(10,2) NOT NULL DEFAULT 0,
monthly_used DECIMAL(10,2) NOT NULL DEFAULT 0,
day_rolling DATE NOT NULL DEFAULT CURRENT_DATE,
week_rolling DATE NOT NULL DEFAULT CURRENT_DATE,
month_rolling DATE NOT NULL DEFAULT DATE_TRUNC('month', CURRENT_DATE),
warning_sent_at TIMESTAMPTZ,
circuit_open_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE agents.tool_audit (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL,
session_id UUID,
tool_name VARCHAR NOT NULL,
category VARCHAR NOT NULL, -- read|propose|execute|restricted
input_hash BYTEA NOT NULL, -- hash of input (not plaintext — PII safe)
output_hash BYTEA,
prompt_hash BYTEA, -- for LLM calls
response_hash BYTEA,
model VARCHAR,
input_tokens INTEGER,
output_tokens INTEGER,
cache_read_tokens INTEGER,
cache_creation_tokens INTEGER,
cost_aud DECIMAL(10,4),
duration_ms INTEGER NOT NULL,
result VARCHAR NOT NULL, -- success|error|rate_limited|budget_exceeded|timeout
error_class VARCHAR, -- exception module if error
correlation_id UUID NOT NULL,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_tool_audit_org_time ON agents.tool_audit(org_id, inserted_at DESC);
CREATE INDEX idx_tool_audit_correlation ON agents.tool_audit(correlation_id);
CREATE INDEX idx_tool_audit_tool ON agents.tool_audit(tool_name, inserted_at DESC);
Tenant Enforcement¶
Custom Finnest.Repo.prepare_query hook (locked Part 5)¶
defmodule Finnest.Repo do
use Ecto.Repo, otp_app: :finnest_core, adapter: Ecto.Adapters.Postgres
@doc """
Automatically scopes every query to current_org_id.
Reads org_id from Process dictionary set by FinnestWeb.Plugs.Tenant.
Raises if no tenant context and the query isn't explicitly opted out.
Opt-out: pass skip_org_scope: true in query options.
Used for:
- migrations (cross-tenant by nature)
- audit reports (intentionally cross-tenant, privileged)
- id_mappings lookups during migration
"""
def prepare_query(_operation, query, opts) do
case Keyword.get(opts, :skip_org_scope) do
true ->
{query, opts}
_ ->
org_id = FinnestCore.Tenant.current_org_id!()
{scope_to_org(query, org_id), opts}
end
end
defp scope_to_org(query, org_id) do
# Uses Ecto.Query.where/3 with query inspection to inject org_id filter
# if not already present.
case query_has_org_scope?(query) do
true -> query
false -> Ecto.Query.where(query, [r], r.org_id == ^org_id)
end
end
# ... helpers ...
end
defmodule FinnestCore.Tenant do
@process_key :finnest_current_org_id
def current_org_id!, do: Process.get(@process_key) || raise("No tenant context set")
def current_org_id, do: Process.get(@process_key)
def put_org_id(org_id), do: Process.put(@process_key, org_id)
def clear, do: Process.delete(@process_key)
end
defmodule FinnestWeb.Plugs.Tenant do
def call(conn, _opts) do
case get_session(conn, :current_user) do
%{org_id: org_id} = user ->
FinnestCore.Tenant.put_org_id(org_id)
conn
|> assign(:current_user, user)
|> assign(:current_org_id, org_id)
nil ->
conn
|> halt()
|> send_resp(401, "")
end
end
end
Oban worker tenant context¶
defmodule FinnestCore.OnbanWorker do
@moduledoc "Base for all workers — restores tenant context from job args."
defmacro __using__(opts) do
quote do
use Oban.Worker, unquote(opts)
@impl Oban.Worker
def perform(%Oban.Job{args: %{"org_id" => org_id} = args} = job) do
FinnestCore.Tenant.put_org_id(org_id)
FinnestCore.CorrelationId.put(args["correlation_id"] || Oban.Job.id(job))
try do
do_perform(job)
after
FinnestCore.Tenant.clear()
FinnestCore.CorrelationId.clear()
end
end
def do_perform(_job), do: raise("do_perform/1 not implemented")
defoverridable do_perform: 1
end
end
end
Architecture test asserting enforcement¶
defmodule Finnest.ArchitectureTest.TenantIsolation do
use ExUnit.Case, async: false
@tag :architecture
test "every Ecto schema has org_id field (except allowlist)" do
allowlist = [
Finnest.Public.Organisation, # tenant root
Compliance.CredentialType, # platform-wide reference data
Compliance.IndustryProfile, # platform-wide reference data
Compliance.Award, # platform-wide reference data
Compliance.AwardClassification, # ditto
Compliance.AwardRate, # ditto
Finnest.Public.IdMapping # cross-tenant by nature during migration
]
schemas = all_ecto_schemas() -- allowlist
for schema <- schemas do
fields = schema.__schema__(:fields)
assert :org_id in fields, "#{inspect(schema)} missing org_id field"
end
end
@tag :architecture
test "Repo query without tenant context raises" do
FinnestCore.Tenant.clear()
assert_raise(RuntimeError, ~r/No tenant context/, fn ->
Finnest.Repo.all(Finnest.People.Employee)
end)
end
@tag :architecture
test "Repo query with org A cannot return org B rows" do
org_a = insert!(:organisation)
org_b = insert!(:organisation)
emp_b = insert!(:employee, org_id: org_b.id)
FinnestCore.Tenant.put_org_id(org_a.id)
assert [] == Finnest.Repo.all(Finnest.People.Employee)
refute Finnest.Repo.get(Finnest.People.Employee, emp_b.id)
end
end
PII Encryption (locked Part 5)¶
Two-layer encryption¶
Layer 1 — RDS storage-level:
- AWS-managed KMS key (commercial)
- CloudHSM-backed KMS key (IRAP)
- Always on, transparent to application
- Protects against storage media theft, snapshot exfil
Layer 2 — Application-level Cloak for truly sensitive columns:
defmodule FinnestCore.Vault do
use Cloak.Vault, otp_app: :finnest_core
@impl Cloak.Vault
def init(config) do
config = Keyword.put(config, :ciphers, [
default: {
Cloak.Ciphers.AES.GCM,
tag: "AES.GCM.V1",
key: decode_key(System.fetch_env!("CLOAK_KEY_V1")),
iv_length: 12
}
])
{:ok, config}
end
defp decode_key(b64), do: Base.decode64!(b64)
end
# Used in schemas:
defmodule Finnest.People.Employee do
use Ecto.Schema
import Ecto.Changeset
schema "employees" do
field :tax_file_number_encrypted, FinnestCore.Vault.Binary
# ... other fields ...
end
end
Columns encrypted at application level:
| Table | Column | Why |
|---|---|---|
people.employees |
tax_file_number_encrypted |
TFN is legally sensitive |
onboard.bank_details |
account_number_encrypted, bsb_encrypted |
Financial instrument |
onboard.tfn_declarations |
tfn_encrypted |
Same as above |
onboard.super_choices |
member_number_encrypted |
Per-employee super identifier |
public.users |
mfa_secret_encrypted |
TOTP seed (where TOTP allowed) |
onboard.extracted_fields |
passport_number_encrypted, drivers_licence_encrypted |
Document identifiers |
compliance.dvs_checks |
document_number_encrypted |
Document verification payload |
Key rotation: Quarterly (aligned with secret rotation). Cloak supports cipher versions — old ciphertext decrypts with old key; new writes use new key.
Separation of duties: Storage-level key held in KMS/CloudHSM; application-level key held in Secrets Manager/SSM. A DBA with direct RDS access cannot read the application-encrypted columns without also compromising the secret store.
Data Flow (Five Canonical Paths)¶
1. Command (write) — the default in every domain¶
LiveView / REST controller
└─ Tenant plug sets {org_id, user_id, correlation_id} in Process dictionary
└─ Finnest.<Domain>.<Context>.<command>(args, actor)
├─ Ecto.Changeset.cast + validate
├─ FinnestCompliance.check/2 [if command is gated — AR-18]
├─ Repo.transaction fn ->
│ ├─ Repo.insert/update (domain row)
│ ├─ EventStore.append(%Event{...}) (same transaction — atomicity)
│ └─ (trigger fires: hash chain computed, row written)
│ end
├─ Phoenix.PubSub.broadcast("org:#{org_id}:#{domain}", event)
└─ {:ok, entity} | {:error, changeset}
Atomicity: domain write and event emission happen in the same transaction. PubSub broadcast is post-commit (outside transaction) to avoid broadcasting uncommitted state.
2. Event subscriber (async cross-domain)¶
Phoenix.PubSub delivers event to subscribing domain's handler module
└─ handler.handle(event)
└─ Oban.insert(ReactionWorker, args: %{event_id, org_id, correlation_id, ...})
└─ ReactionWorker.perform (idempotent — QJ-01)
├─ look up aggregate, apply reaction
├─ may emit further events (causation_id links chain)
└─ bounded by AI-05: max 10 events per correlation chain
Idempotency pattern: every worker checks if its effect has already been applied (via aggregate state or a dedupe marker) before doing anything observable.
3. Query (read)¶
LiveView mount / API / MCP tool
└─ Finnest.<Domain>.<Queries>.<function>(params, org_id)
├─ Ecto.Query scoped by org_id (automatic via prepare_query)
├─ Explicit preloads (CQ-05) — no lazy loading
├─ Optional ETS cache read for hot paths (feature flags, credential types, awards)
└─ {:ok, results}
4. Agent-mediated read¶
User query → Orchestrator
├─ Tier-1: pattern match → route to MCP tool directly, no LLM
├─ Tier-2: LLM classifies → one or many MCP tool calls → LLM composes response
└─ Every tool call logged to agents.tool_audit with cost + ms + correlation_id
Full detail in agents.md.
5. Mobile offline sync¶
Full detail in mobile.md. Summary:
Flutter app offline → SQLite event queue (mirrors backend event shape)
└─ on connectivity: POST /api/v1/sync { events: [...], last_sync_at }
└─ server-side reconciliation per conflict rules (B07)
├─ clock events: client wins (timestamp authoritative)
├─ roster changes: server wins (supervisor may have changed)
├─ leave requests: server decides (may be approved/rejected)
└─ time cards: merge with server validation
└─ return {confirmations, conflicts, server_updates}
Strangler Fig Migration¶
Migration topology¶
MySQL (admin_central, admin_atslive, actatek)
│ (read-only via Finnest.V2Repo + MyXQL)
│
│ Oban migration workers stream v2 data
│ Write to Finnest.Repo (Postgres)
│ Populate public.id_mappings
│
▼
PostgreSQL (Finnest primary)
│
│ During transition:
│ - Phase 1-2: Finnest read-copy, v2 source of truth
│ - Phase 3-6: Finnest becomes source of truth per domain as cutover completes
│ - Phase 7-8: v2 read-only archive → decommission
│
▼
Applications query Postgres
Dual Ecto repos¶
# config/config.exs
config :finnest, ecto_repos: [Finnest.Repo, Finnest.V2Repo]
# Primary repo — Postgres
defmodule Finnest.Repo do
use Ecto.Repo, otp_app: :finnest_core, adapter: Ecto.Adapters.Postgres
# ... prepare_query hook ...
end
# Migration read-only repo — MySQL
defmodule Finnest.V2Repo do
use Ecto.Repo, otp_app: :finnest_core, adapter: Ecto.Adapters.MyXQL, read_only: true
# No migrations directory
# Read-only at connection level: user has SELECT only
# Schemas use @primary_key false if v2 table has no PK
end
# Example read-only schema for v2 data
defmodule Finnest.V2.Candidate do
use Ecto.Schema
# DA-02: explicit fields only, no `field :any, :integer` fallback
@primary_key {:id, :integer, []}
schema "candidates" do
field :first_name, :string
field :last_name, :string
# ... etc — exactly what we read, nothing more
end
end
Guarantees (AR-05, DA-01, DA-02):
- V2Repo connection credentials have
SELECTonly (no INSERT/UPDATE/DELETE permission at DB level) read_only: trueat Ecto level prevents accidentalinsert/update/deletecalls- Schemas declare only fields we read — no
*queries
Phase order (B11, supersedes B04)¶
Phase 1 (Weeks 1-4 post go-live):
[Finnest: Recruit] ──handoff──→ [v2: Onboard, CMS, Roster, T&A, Payroll]
Phase 2 (Weeks 5-8):
[Finnest: Recruit, Onboard] ──handoff──→ [v2: CMS, Roster, T&A, Payroll]
Phase 3 (Weeks 9-16):
[Finnest: Recruit, Onboard, CMS, Roster] ──handoff──→ [v2: T&A, Payroll]
Phase 4 (Weeks 17-24):
[Finnest: + T&A, Reporting] ──handoff──→ [v2: Payroll only]
Phase X (Weeks 25-32):
[Finnest: Everything + Mobile + Pact] ──→ [v2: read-only archive]
Per-phase migration procedure¶
Pre-migration:
- Data quality audit — run read-only SQL against v2 for orphans, duplicates, encoding issues (B04 audit checklist)
- Dry-run — migration workers run with
write: falseflag; output row-count parity report + statistical diff - Stakeholder sign-off on audit findings and dry-run report
Migration window:
- Migration workers queued in Oban
- Read from V2Repo, transform, write to primary Repo, populate id_mappings
- Per-entity correlation_id propagated so anomalies trace back to source
- Progress tracked in
migration_runstable (per phase, per batch, per entity type)
Post-migration:
- Row-count parity check:
v2 count == Finnest active count - Sampling validation: 100 random entities compared field-by-field
- Monetary statistical diff: sum of all monetary columns must match to the cent
- Handoff switch: app starts reading the migrated domain from Finnest; v2 becomes read-only for that domain
- 3-month parallel access window before v2 decommissioned for the domain — users can still query v2 for historical lookups
Rollback¶
- Per-phase rollback possible within 24 hours via v2→Finnest difference replay
- After 24 hours, rollback is event-replay from event store + archived v2 snapshots
- Documented in
runbooks/migration-rollback.md(Phase 3)
Reference Data vs Operational Data¶
| Classification | Examples | Scope | Cross-schema readable? |
|---|---|---|---|
| Operational | Employees, shifts, timecards, pay runs, invoices, incidents | Per-tenant | No — events only (AR-07) |
| Reference (platform-wide) | Credential types, industry profiles, awards, award rates, labour hire licensing rules | Platform | Yes — documented exception |
| Reference (tenant-specific) | Rate cards, shift templates, roster rules | Per-tenant | No — events only |
The reference-data exception exists because:
- These are stable facts (a "forklift license" is the same fact everywhere)
- Read-only from application perspective (only admin UI writes them)
- Without the exception, every domain would duplicate credential lookups over events — operational pain for no gain
Compliance.check/2already crosses the boundary under AR-18 — so the exception is small
Data Retention Matrix¶
| Category | Commercial | IRAP | Mechanism |
|---|---|---|---|
| Operational data (employees, shifts, etc.) | Active + 3 years post-termination | Active + 7 years | Soft delete (deleted_at); periodic hard-delete-after-retention job |
| Event store | Permanent (partitioned monthly) | Same (7 years required, kept permanently for data flywheel) | Partition archival; retained partitions accessible via same query |
| Agent sessions / messages | 90 days rolling | 7 years | Partitioned or rolling cleanup |
| Agent tool audit | 90 days | 7 years | Same |
| Audit log entries | 3 years | 7 years | Same as event store |
| Documents (S3) | 3 years post-termination | 7 years | S3 lifecycle policy (commercial) / Object Lock compliance mode (IRAP) |
| Backups | 30 daily, 12 monthly, 5 yearly | 30 daily, 12 monthly, 7 yearly | DA-09, IR-10 |
| Logs (CloudWatch) | 90 days | 7 years | CloudWatch retention policy |
Capacity Planning (repeated from main for convenience)¶
Phase 1 (5000 employees target)¶
| Resource | Estimate |
|---|---|
| BEAM node | 1 × c6g.xlarge (4 vCPU / 8 GB) |
| Postgres | 1 × db.r6g.large (2 vCPU / 16 GB / 100 GB gp3) |
| Event store growth | ~500K events/month |
| Index build time | ~30 min (initial) / ~2 min per partition (monthly) |
| Migration window per domain | ~2-4 hours for 5K employees |
| id_mappings size | ~5K rows initially, grows with migration phases |
Hot query patterns (target p95 latencies)¶
| Query | Table | Target p95 | Notes |
|---|---|---|---|
| Employee list by org | people.employees |
<50ms | Index on (org_id, deleted_at) |
| Today's shifts | roster.shifts |
<50ms | Index on (org_id, date_range) |
| Clock-in insert | timekeep.clock_events |
<100ms | Atomic insert + Compliance.check in transaction |
| Credential check | compliance.credential_types via ETS |
<5ms | Hot cache; DB fallback rare |
| Event store append | events.domain_events_<month> |
<20ms | Append-only partition |
| Agent tool audit insert | agents.tool_audit |
<20ms | Separate high-volume partition (future) |
Open Questions¶
| # | Item | Phase | Owner |
|---|---|---|---|
| DA-OI-01 | agents.tool_audit partitioning strategy — monthly like events, or by org? Consider when volume >10M rows |
Phase 2 | Infra |
| DA-OI-02 | pgvector index when L2/L3 memory embeddings stabilise — ivfflat or hnsw? | Phase 3 | Agents team |
| DA-OI-03 | Event store archival: move >3-year partitions to S3 Parquet? When? | Phase 4 | Infra |
| DA-OI-04 | Cross-tenant anonymisation job for L3 memories — pseudonymisation approach | Phase 3 | Agents team |
| DA-OI-05 | id_mappings retention: kept permanently for audit, or archived after 5 years? | Phase 4 | Ops |
References¶
../brainstorms/brainstorm-04-data-model-migration.md— data model design discussion../brainstorms/brainstorm-05-multi-industry-design.md— compliance schema design../brainstorms/brainstorm-11-traffio-laravel-migration-naming.md— phase order decision../10-GUARDRAILS.md§6 (DA-01–DA-14), §2 (AR-), §3 (SE-)./architecture.md— main architecture (Part 5 summary)./agents.md— agent data interactions./irap.md— IRAP-specific data handling./mobile.md— mobile offline event model