Skip to content

Finnest Data Architecture

Date: 2026-04-16 Status: Draft Scope: Data model, query patterns, tenant enforcement, event store, migration approach. Expands on the summary in architecture.md Part 5.

Related documents: architecture.md (main), ../brainstorms/brainstorm-04-data-model-migration.md, ../brainstorms/brainstorm-05-multi-industry-design.md, ../brainstorms/brainstorm-11-traffio-laravel-migration-naming.md.


Schema Topology

One PostgreSQL cluster. 22 schemas — 19 domain + events + agents + public.

Schema Tables Tier Key Entities
public ~10 Core organisations, users, offices, feature_flags, audit_log, notifications, tags, id_mappings
people ~10 1 employees, employee_qualifications, leave_requests, leave_balances, contracts, unavailability
recruit ~12 1 job_orders, candidate_pools, candidate_assessments, scoring_results, outreach_log, talent_pools, drip_campaigns
onboard ~14 1 onboarding_profiles, document_verifications, verifiable_documents, verification_stages, extracted_fields, identity_checklists, tfn_declarations, super_choices, bank_details, pipeline_steps
roster ~12 1 shifts, shift_templates, shift_assignments, shift_codes, roster_rules, availability, demand_forecasts, shift_swaps, shift_bids, block_lists
timekeep ~10 1 timecards, timecard_entries, timecard_allowances, clock_events, terminals, geo_locations, breaks, journals, dockets
reach ~8 1 conversations, messages, message_deliveries, channels, templates, handoff_records, news_feed, announcements
pulse ~7 1 automation_runs, automation_results, optimisation_runs, anomaly_detections, predictive_models, dashboards
payroll ~15 2 pay_runs, pay_lines, tax_calculations, super_calculations, deductions, stp_submissions, invoices, payments, rate_cards
clients ~10 2 clients, contacts, sites, agencies, rate_cards, rate_configurations, job_costings, client_hierarchies, opportunities
safety ~12 2 incidents, investigations, hazards, risk_assessments, inspections, swms_documents, corrective_actions, drug_alcohol_tests, custom_forms
assets ~10 2 equipment, equipment_assignments, maintenance_schedules, work_orders, fleet_vehicles, prestart_checklists
quotes ~8 3 leads, quotes, jobs, tasks, milestones, agreements, signatures
learn ~8 3 courses, tutors, enrollments, progress, certifications, learning_paths, reinduction_schedules
benefits ~6 3 programs, enrollments, perks, recognitions, rewards_points, ewa_transactions
compliance ~12 4 credential_types, industry_profiles, awards, award_classifications, award_rates, compliance_rules, ongoing_monitors, compliance_scores, labour_hire_licensing, dvs_checks, pep_screening
fatigue ~6 4 fatigue_rules, driver_hours, rest_periods, ewd_entries, fitness_assessments
clearance ~5 4 clearance_applications, clearance_statuses, vetting_records, program_separations
performance ~10 5 goals, reviews, feedback, surveys, enps_scores, calibrations, development_plans, skills_matrix
events 1 Infra domain_events (partitioned by month)
agents ~5 Infra sessions, messages, memories, budget_limits, tool_audit
Total ~195

Design Principles (non-negotiable)

From B04 + Commandment #16 + DA-* guardrails:

  1. Every table has org_id — except public.organisations itself (DA-11)
  2. Every table has inserted_at, updated_at, deleted_at — soft delete via timestamp (DA-12)
  3. Every primary key is a UUID (gen_random_uuid()) — no auto-increment (DA-13)
  4. JSONB for industry-specific extension fields — the metadata column pattern
  5. Typed Ecto embeds for structured JSON — validate at application level
  6. No cross-schema JOINs for operational queries (AR-07) — events instead
  7. Reference-data schemas readable cross-schemacompliance.credential_types, .industry_profiles, .awards only
  8. PII encrypted at rest — TFN, bank, tax, passport, MFA secrets via Cloak AES-256-GCM (SE-13)
  9. Leading org_id on every operational index — tenant-first
  10. Partial indexes WHERE deleted_at IS NULL for active-record queries
  11. All reversible migrations (CQ-11)
  12. UTF-8 encoding, en_AU.UTF-8 collation (DA-05)

Core Entities

public.organisations — tenant root

CREATE TABLE public.organisations (
  id                    UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  name                  VARCHAR NOT NULL,
  slug                  VARCHAR UNIQUE NOT NULL,
  abn                   VARCHAR,                           -- Australian Business Number
  industry_profile_id   UUID REFERENCES compliance.industry_profiles(id),
  subscription_tier     VARCHAR NOT NULL DEFAULT 'tier1',
  settings              JSONB NOT NULL DEFAULT '{}',       -- ip_allowlist, agent_action_mode, etc.
  irap_classified       BOOLEAN NOT NULL DEFAULT false,    -- routing/enforcement flag
  inserted_at           TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at            TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  deleted_at            TIMESTAMPTZ
);

CREATE UNIQUE INDEX idx_organisations_slug ON public.organisations(slug) WHERE deleted_at IS NULL;
CREATE INDEX idx_organisations_abn ON public.organisations(abn) WHERE abn IS NOT NULL;

public.users — merged from v2's three user tables (B04)

CREATE TABLE public.users (
  id                      UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  org_id                  UUID NOT NULL REFERENCES public.organisations(id),
  email                   VARCHAR NOT NULL,
  email_citext            CITEXT GENERATED ALWAYS AS (LOWER(email)) STORED,
  hashed_password         VARCHAR NOT NULL,                 -- Argon2
  role                    VARCHAR NOT NULL,                 -- director|admin|staff|payroll|supervisor|worker|client_contact
  mfa_enabled             BOOLEAN NOT NULL DEFAULT false,
  mfa_method              VARCHAR,                          -- fido2|totp (totp only in commercial for non-admin)
  mfa_secret_encrypted    BYTEA,                            -- AES-256-GCM via Cloak
  fido2_credentials       JSONB DEFAULT '[]',               -- registered public keys
  last_login_at           TIMESTAMPTZ,
  last_login_ip           INET,
  session_timeout_seconds INTEGER,                          -- null = use config default
  inserted_at             TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at              TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  deleted_at              TIMESTAMPTZ
);

CREATE UNIQUE INDEX idx_users_org_email ON public.users(org_id, email_citext) WHERE deleted_at IS NULL;
CREATE INDEX idx_users_role ON public.users(org_id, role) WHERE deleted_at IS NULL;

public.id_mappings — migration backbone (B04 Insight 2)

Populated by migration workers during Strangler Fig; essential for cross-referencing v2 data to Finnest UUIDs.

CREATE TABLE public.id_mappings (
  v2_database       VARCHAR NOT NULL,                       -- 'admin_central' | 'admin_atslive' | 'actatek'
  v2_table          VARCHAR NOT NULL,
  v2_id             BIGINT NOT NULL,
  finnest_schema    VARCHAR NOT NULL,
  finnest_table     VARCHAR NOT NULL,
  finnest_id        UUID NOT NULL,
  org_id            UUID NOT NULL REFERENCES public.organisations(id),
  migrated_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  migration_batch   VARCHAR,                                -- which migration run produced this row
  PRIMARY KEY (v2_database, v2_table, v2_id)
);

CREATE INDEX idx_id_mappings_finnest ON public.id_mappings(finnest_schema, finnest_table, finnest_id);
CREATE INDEX idx_id_mappings_org ON public.id_mappings(org_id);

people.employees — single-record lifecycle (B12 H1)

CREATE TABLE people.employees (
  id                          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  org_id                      UUID NOT NULL REFERENCES public.organisations(id),
  user_id                     UUID REFERENCES public.users(id),   -- nullable (not all employees are users)
  employee_number             VARCHAR,
  first_name                  VARCHAR NOT NULL,
  last_name                   VARCHAR NOT NULL,
  preferred_name              VARCHAR,
  date_of_birth               DATE,
  gender                      VARCHAR,
  email                       VARCHAR,
  phone                       VARCHAR,
  address                     JSONB,
  emergency_contact           JSONB,
  tax_file_number_encrypted   BYTEA,                              -- Cloak AES-256-GCM (SE-13)
  employment_type             VARCHAR NOT NULL,                   -- casual|part_time|full_time|contractor
  employment_status           VARCHAR NOT NULL DEFAULT 'onboarding',
  start_date                  DATE,
  termination_date            DATE,
  primary_office_id           UUID REFERENCES public.offices(id),
  primary_client_id           UUID,                               -- refs clients.clients (cross-schema; read-only)
  metadata                    JSONB NOT NULL DEFAULT '{}',        -- industry-specific extended fields
  inserted_at                 TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at                  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  deleted_at                  TIMESTAMPTZ
);

CREATE INDEX idx_employees_org ON people.employees(org_id);
CREATE INDEX idx_employees_active ON people.employees(org_id, employment_status) WHERE deleted_at IS NULL;
CREATE INDEX idx_employees_name ON people.employees(org_id, last_name, first_name) WHERE deleted_at IS NULL;
CREATE INDEX idx_employees_user ON people.employees(user_id) WHERE user_id IS NOT NULL;
CREATE INDEX idx_employees_number ON people.employees(org_id, employee_number) WHERE employee_number IS NOT NULL;

compliance.credential_types — hierarchical credential registry (B05)

CREATE TABLE compliance.credential_types (
  id                        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  parent_id                 UUID REFERENCES compliance.credential_types(id),
  code                      VARCHAR UNIQUE NOT NULL,              -- e.g., "hrwl_forklift"
  name                      VARCHAR NOT NULL,
  category                  VARCHAR NOT NULL,                     -- high_risk_work|construction|mining|...
  nationally_recognized     BOOLEAN NOT NULL DEFAULT false,
  issuing_jurisdictions     JSONB NOT NULL DEFAULT '[]',          -- ["NSW","VIC",...]
  mutual_recognition        BOOLEAN NOT NULL DEFAULT true,
  renewal_period_months     INTEGER,                              -- NULL = no expiry
  prerequisites             JSONB NOT NULL DEFAULT '[]',          -- [credential_type_ids]
  industries                JSONB NOT NULL DEFAULT '[]',          -- ["construction","mining",...]
  verification_method       VARCHAR NOT NULL,                     -- document|api|manual|self_declared
  document_templates        JSONB NOT NULL DEFAULT '{}',
  metadata                  JSONB NOT NULL DEFAULT '{}',
  inserted_at               TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at                TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  deleted_at                TIMESTAMPTZ
);

-- Seeded with ~100+ credential types across all industries (B05 seed list)

Note: This table is a reference-data exception to AR-07 — readable cross-schema by 8+ domains. No org_id because credentials are platform-wide facts (a "forklift license" is the same everywhere).

compliance.industry_profiles — composable per-org (B05)

CREATE TABLE compliance.industry_profiles (
  id                            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  slug                          VARCHAR UNIQUE NOT NULL,
  display_name                  VARCHAR NOT NULL,
  description                   TEXT,
  required_credential_types     JSONB NOT NULL DEFAULT '[]',      -- credential_type_ids
  recommended_credential_types  JSONB NOT NULL DEFAULT '[]',
  enabled_modules               JSONB NOT NULL DEFAULT '[]',      -- ["safety","fatigue","clearance"]
  onboarding_steps              JSONB NOT NULL DEFAULT '[]',
  compliance_rules              JSONB NOT NULL DEFAULT '{}',
  terminology                   JSONB NOT NULL DEFAULT '{}',      -- {"worker":"operative","shift":"deployment"}
  dashboard_config              JSONB NOT NULL DEFAULT '{}',
  report_templates              JSONB NOT NULL DEFAULT '{}',
  metadata                      JSONB NOT NULL DEFAULT '{}',
  inserted_at                   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at                    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  deleted_at                    TIMESTAMPTZ
);

CREATE TABLE compliance.org_industry_profiles (
  org_id                UUID NOT NULL REFERENCES public.organisations(id),
  industry_profile_id   UUID NOT NULL REFERENCES compliance.industry_profiles(id),
  activated_at          TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  custom_overrides      JSONB NOT NULL DEFAULT '{}',
  PRIMARY KEY (org_id, industry_profile_id)
);

Ten profiles seeded by end of Phase 3: labour_hire, construction, mining, logistics, defence, retail, white_collar, traffic, civil, engineering. Composable — an org activates multiple profiles; requirements union.


Event Store (the load-bearing table)

CREATE SCHEMA events;

CREATE TABLE events.domain_events (
  id             UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  domain         VARCHAR(50)  NOT NULL,                       -- 'roster', 'payroll'
  event_type     VARCHAR(100) NOT NULL,                       -- 'shift_assigned', 'pay_run_finalised'
  aggregate_id   UUID,                                         -- entity the event belongs to
  org_id         UUID NOT NULL,                                -- tenant isolation
  payload        JSONB NOT NULL,                               -- event data (schema-versioned)
  metadata       JSONB NOT NULL DEFAULT '{}',                  -- correlation_id, causation_id, actor_id,
                                                                -- agent_id, classification, schema_version
  inserted_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  prev_hash      BYTEA,                                         -- tamper-evident hash chain
  hash           BYTEA NOT NULL                                 -- sha256(prev_hash || id || org_id || ...)
) PARTITION BY RANGE (inserted_at);

-- Monthly partitions (DA-14) — ensures query locality and retention management
CREATE TABLE events.domain_events_2026_04 PARTITION OF events.domain_events
  FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
CREATE TABLE events.domain_events_2026_05 PARTITION OF events.domain_events
  FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
-- ... (monthly partition creation runs as scheduled Oban job)

-- Indexes (per partition, inherited)
CREATE INDEX idx_events_org_domain_type ON events.domain_events (org_id, domain, event_type, inserted_at DESC);
CREATE INDEX idx_events_aggregate ON events.domain_events (aggregate_id) WHERE aggregate_id IS NOT NULL;
CREATE INDEX idx_events_correlation ON events.domain_events ((metadata->>'correlation_id'));

-- Immutability trigger (AR-17)
CREATE OR REPLACE FUNCTION events.prevent_modification()
RETURNS TRIGGER AS $$
BEGIN
  RAISE EXCEPTION 'Events are immutable. UPDATE and DELETE are not allowed.';
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER no_modify_events
  BEFORE UPDATE OR DELETE ON events.domain_events
  FOR EACH ROW EXECUTE FUNCTION events.prevent_modification();

-- Hash chain on insert (SE-21, IR-10)
CREATE OR REPLACE FUNCTION events.compute_hash()
RETURNS TRIGGER AS $$
DECLARE
  last_hash BYTEA;
  hash_input TEXT;
BEGIN
  -- Get the most recent hash for this org (chain per-org)
  SELECT hash INTO last_hash
  FROM events.domain_events
  WHERE org_id = NEW.org_id
  ORDER BY inserted_at DESC
  LIMIT 1;

  NEW.prev_hash := last_hash;

  hash_input := COALESCE(encode(last_hash, 'hex'), '') ||
                NEW.id::text ||
                NEW.org_id::text ||
                NEW.domain ||
                NEW.event_type ||
                COALESCE(NEW.aggregate_id::text, '') ||
                NEW.payload::text ||
                NEW.inserted_at::text;

  NEW.hash := digest(hash_input, 'sha256');

  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER hash_chain_on_insert
  BEFORE INSERT ON events.domain_events
  FOR EACH ROW EXECUTE FUNCTION events.compute_hash();

Three responsibilities, one table (B02 Insight 3):

  1. Cross-domain event bus — Phoenix.PubSub broadcasts on insert (via Postgres LISTEN/NOTIFY or Elixir broadcast triggered by EventStore.append/1)
  2. IRAP-compliant audit log — append-only + immutable + hash-chained satisfies 7-year retention and tamper-evidence (IR-10, SE-21)
  3. Data flywheel — historical events enable anomaly detection, agent memory L3, industry intelligence aggregations

Hash chain verification (monthly script):

defmodule FinnestCore.EventStore.Integrity do
  @doc "Walks the chain for a given org, asserting integrity. Returns :ok or {:error, breaks_at_event_id}."
  def verify(org_id) do
    events =
      from(e in "events.domain_events",
        where: e.org_id == ^org_id,
        order_by: [asc: e.inserted_at],
        select: %{id: e.id, prev_hash: e.prev_hash, hash: e.hash, payload: e.payload, ...}
      )
      |> Repo.all()

    Enum.reduce_while(events, {:ok, nil}, fn event, {:ok, last_hash} ->
      expected_prev = last_hash
      if event.prev_hash == expected_prev and recompute_hash(event) == event.hash do
        {:cont, {:ok, event.hash}}
      else
        {:halt, {:error, event.id}}
      end
    end)
  end
end

Agents Schema

CREATE SCHEMA agents;

CREATE TABLE agents.sessions (
  id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  org_id        UUID NOT NULL REFERENCES public.organisations(id),
  user_id       UUID NOT NULL REFERENCES public.users(id),
  started_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  ended_at      TIMESTAMPTZ,
  metadata      JSONB NOT NULL DEFAULT '{}',
  inserted_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at    TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE agents.messages (
  id             UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  session_id     UUID NOT NULL REFERENCES agents.sessions(id),
  org_id         UUID NOT NULL,
  role           VARCHAR NOT NULL,               -- user|assistant|tool|system
  content        TEXT,                           -- user/assistant message content (retention per classification)
  tool_calls     JSONB,                          -- array of tool invocations made by this message
  tokens         INTEGER,                        -- count for assistant messages
  latency_ms     INTEGER,                        -- time to generate
  inserted_at    TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE agents.memories (
  id             UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  level          VARCHAR NOT NULL,               -- 'l2_tenant' | 'l3_platform'
  org_id         UUID,                           -- NULL for L3 (platform-wide anonymised)
  scope          VARCHAR,                        -- e.g. 'client:<id>' or 'industry:mining'
  memory_type    VARCHAR NOT NULL,               -- preference|pattern|fact
  content        JSONB NOT NULL,
  embedding      VECTOR(1536),                   -- pgvector (future phase)
  source         JSONB,                          -- {session_id, correlation_id, agent_id, confirmed_by}
  inserted_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  last_used_at   TIMESTAMPTZ,
  use_count      INTEGER NOT NULL DEFAULT 0
);

CREATE INDEX idx_memories_org_scope ON agents.memories(org_id, scope) WHERE level = 'l2_tenant';
CREATE INDEX idx_memories_platform_scope ON agents.memories(scope) WHERE level = 'l3_platform';
-- pgvector index added in future phase:
-- CREATE INDEX idx_memories_embedding ON agents.memories USING ivfflat (embedding vector_cosine_ops);

CREATE TABLE agents.budget_limits (
  org_id          UUID PRIMARY KEY REFERENCES public.organisations(id),
  daily_limit     DECIMAL(10,2) NOT NULL,
  weekly_limit    DECIMAL(10,2) NOT NULL,
  monthly_limit   DECIMAL(10,2) NOT NULL,
  daily_used      DECIMAL(10,2) NOT NULL DEFAULT 0,
  weekly_used     DECIMAL(10,2) NOT NULL DEFAULT 0,
  monthly_used    DECIMAL(10,2) NOT NULL DEFAULT 0,
  day_rolling     DATE NOT NULL DEFAULT CURRENT_DATE,
  week_rolling    DATE NOT NULL DEFAULT CURRENT_DATE,
  month_rolling   DATE NOT NULL DEFAULT DATE_TRUNC('month', CURRENT_DATE),
  warning_sent_at TIMESTAMPTZ,
  circuit_open_at TIMESTAMPTZ,
  updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE agents.tool_audit (
  id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  org_id          UUID NOT NULL,
  session_id      UUID,
  tool_name       VARCHAR NOT NULL,
  category        VARCHAR NOT NULL,               -- read|propose|execute|restricted
  input_hash      BYTEA NOT NULL,                 -- hash of input (not plaintext — PII safe)
  output_hash     BYTEA,
  prompt_hash     BYTEA,                          -- for LLM calls
  response_hash   BYTEA,
  model           VARCHAR,
  input_tokens    INTEGER,
  output_tokens   INTEGER,
  cache_read_tokens     INTEGER,
  cache_creation_tokens INTEGER,
  cost_aud        DECIMAL(10,4),
  duration_ms     INTEGER NOT NULL,
  result          VARCHAR NOT NULL,               -- success|error|rate_limited|budget_exceeded|timeout
  error_class     VARCHAR,                        -- exception module if error
  correlation_id  UUID NOT NULL,
  inserted_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_tool_audit_org_time ON agents.tool_audit(org_id, inserted_at DESC);
CREATE INDEX idx_tool_audit_correlation ON agents.tool_audit(correlation_id);
CREATE INDEX idx_tool_audit_tool ON agents.tool_audit(tool_name, inserted_at DESC);

Tenant Enforcement

Custom Finnest.Repo.prepare_query hook (locked Part 5)

defmodule Finnest.Repo do
  use Ecto.Repo, otp_app: :finnest_core, adapter: Ecto.Adapters.Postgres

  @doc """
  Automatically scopes every query to current_org_id.

  Reads org_id from Process dictionary set by FinnestWeb.Plugs.Tenant.
  Raises if no tenant context and the query isn't explicitly opted out.

  Opt-out: pass skip_org_scope: true in query options.
  Used for:
    - migrations (cross-tenant by nature)
    - audit reports (intentionally cross-tenant, privileged)
    - id_mappings lookups during migration
  """
  def prepare_query(_operation, query, opts) do
    case Keyword.get(opts, :skip_org_scope) do
      true -> 
        {query, opts}
      _ ->
        org_id = FinnestCore.Tenant.current_org_id!()
        {scope_to_org(query, org_id), opts}
    end
  end

  defp scope_to_org(query, org_id) do
    # Uses Ecto.Query.where/3 with query inspection to inject org_id filter
    # if not already present.
    case query_has_org_scope?(query) do
      true -> query
      false -> Ecto.Query.where(query, [r], r.org_id == ^org_id)
    end
  end

  # ... helpers ...
end

defmodule FinnestCore.Tenant do
  @process_key :finnest_current_org_id

  def current_org_id!, do: Process.get(@process_key) || raise("No tenant context set")
  def current_org_id, do: Process.get(@process_key)
  def put_org_id(org_id), do: Process.put(@process_key, org_id)
  def clear, do: Process.delete(@process_key)
end

defmodule FinnestWeb.Plugs.Tenant do
  def call(conn, _opts) do
    case get_session(conn, :current_user) do
      %{org_id: org_id} = user ->
        FinnestCore.Tenant.put_org_id(org_id)
        conn
        |> assign(:current_user, user)
        |> assign(:current_org_id, org_id)
      nil ->
        conn
        |> halt()
        |> send_resp(401, "")
    end
  end
end

Oban worker tenant context

defmodule FinnestCore.OnbanWorker do
  @moduledoc "Base for all workers — restores tenant context from job args."

  defmacro __using__(opts) do
    quote do
      use Oban.Worker, unquote(opts)

      @impl Oban.Worker
      def perform(%Oban.Job{args: %{"org_id" => org_id} = args} = job) do
        FinnestCore.Tenant.put_org_id(org_id)
        FinnestCore.CorrelationId.put(args["correlation_id"] || Oban.Job.id(job))

        try do
          do_perform(job)
        after
          FinnestCore.Tenant.clear()
          FinnestCore.CorrelationId.clear()
        end
      end

      def do_perform(_job), do: raise("do_perform/1 not implemented")
      defoverridable do_perform: 1
    end
  end
end

Architecture test asserting enforcement

defmodule Finnest.ArchitectureTest.TenantIsolation do
  use ExUnit.Case, async: false

  @tag :architecture
  test "every Ecto schema has org_id field (except allowlist)" do
    allowlist = [
      Finnest.Public.Organisation,         # tenant root
      Compliance.CredentialType,            # platform-wide reference data
      Compliance.IndustryProfile,           # platform-wide reference data
      Compliance.Award,                     # platform-wide reference data
      Compliance.AwardClassification,       # ditto
      Compliance.AwardRate,                 # ditto
      Finnest.Public.IdMapping              # cross-tenant by nature during migration
    ]

    schemas = all_ecto_schemas() -- allowlist

    for schema <- schemas do
      fields = schema.__schema__(:fields)
      assert :org_id in fields, "#{inspect(schema)} missing org_id field"
    end
  end

  @tag :architecture
  test "Repo query without tenant context raises" do
    FinnestCore.Tenant.clear()

    assert_raise(RuntimeError, ~r/No tenant context/, fn ->
      Finnest.Repo.all(Finnest.People.Employee)
    end)
  end

  @tag :architecture
  test "Repo query with org A cannot return org B rows" do
    org_a = insert!(:organisation)
    org_b = insert!(:organisation)
    emp_b = insert!(:employee, org_id: org_b.id)

    FinnestCore.Tenant.put_org_id(org_a.id)

    assert [] == Finnest.Repo.all(Finnest.People.Employee)
    refute Finnest.Repo.get(Finnest.People.Employee, emp_b.id)
  end
end

PII Encryption (locked Part 5)

Two-layer encryption

Layer 1 — RDS storage-level:

  • AWS-managed KMS key (commercial)
  • CloudHSM-backed KMS key (IRAP)
  • Always on, transparent to application
  • Protects against storage media theft, snapshot exfil

Layer 2 — Application-level Cloak for truly sensitive columns:

defmodule FinnestCore.Vault do
  use Cloak.Vault, otp_app: :finnest_core

  @impl Cloak.Vault
  def init(config) do
    config = Keyword.put(config, :ciphers, [
      default: {
        Cloak.Ciphers.AES.GCM,
        tag: "AES.GCM.V1",
        key: decode_key(System.fetch_env!("CLOAK_KEY_V1")),
        iv_length: 12
      }
    ])

    {:ok, config}
  end

  defp decode_key(b64), do: Base.decode64!(b64)
end

# Used in schemas:
defmodule Finnest.People.Employee do
  use Ecto.Schema
  import Ecto.Changeset

  schema "employees" do
    field :tax_file_number_encrypted, FinnestCore.Vault.Binary
    # ... other fields ...
  end
end

Columns encrypted at application level:

Table Column Why
people.employees tax_file_number_encrypted TFN is legally sensitive
onboard.bank_details account_number_encrypted, bsb_encrypted Financial instrument
onboard.tfn_declarations tfn_encrypted Same as above
onboard.super_choices member_number_encrypted Per-employee super identifier
public.users mfa_secret_encrypted TOTP seed (where TOTP allowed)
onboard.extracted_fields passport_number_encrypted, drivers_licence_encrypted Document identifiers
compliance.dvs_checks document_number_encrypted Document verification payload

Key rotation: Quarterly (aligned with secret rotation). Cloak supports cipher versions — old ciphertext decrypts with old key; new writes use new key.

Separation of duties: Storage-level key held in KMS/CloudHSM; application-level key held in Secrets Manager/SSM. A DBA with direct RDS access cannot read the application-encrypted columns without also compromising the secret store.


Data Flow (Five Canonical Paths)

1. Command (write) — the default in every domain

LiveView / REST controller
  └─ Tenant plug sets {org_id, user_id, correlation_id} in Process dictionary
      └─ Finnest.<Domain>.<Context>.<command>(args, actor)
          ├─ Ecto.Changeset.cast + validate
          ├─ FinnestCompliance.check/2     [if command is gated — AR-18]
          ├─ Repo.transaction fn ->
          │    ├─ Repo.insert/update        (domain row)
          │    ├─ EventStore.append(%Event{...})   (same transaction — atomicity)
          │    └─ (trigger fires: hash chain computed, row written)
          │  end
          ├─ Phoenix.PubSub.broadcast("org:#{org_id}:#{domain}", event)
          └─ {:ok, entity} | {:error, changeset}

Atomicity: domain write and event emission happen in the same transaction. PubSub broadcast is post-commit (outside transaction) to avoid broadcasting uncommitted state.

2. Event subscriber (async cross-domain)

Phoenix.PubSub delivers event to subscribing domain's handler module
  └─ handler.handle(event)
      └─ Oban.insert(ReactionWorker, args: %{event_id, org_id, correlation_id, ...})
          └─ ReactionWorker.perform (idempotent — QJ-01)
              ├─ look up aggregate, apply reaction
              ├─ may emit further events (causation_id links chain)
              └─ bounded by AI-05: max 10 events per correlation chain

Idempotency pattern: every worker checks if its effect has already been applied (via aggregate state or a dedupe marker) before doing anything observable.

3. Query (read)

LiveView mount / API / MCP tool
  └─ Finnest.<Domain>.<Queries>.<function>(params, org_id)
      ├─ Ecto.Query scoped by org_id (automatic via prepare_query)
      ├─ Explicit preloads (CQ-05) — no lazy loading
      ├─ Optional ETS cache read for hot paths (feature flags, credential types, awards)
      └─ {:ok, results}

4. Agent-mediated read

User query → Orchestrator
  ├─ Tier-1: pattern match → route to MCP tool directly, no LLM
  ├─ Tier-2: LLM classifies → one or many MCP tool calls → LLM composes response
  └─ Every tool call logged to agents.tool_audit with cost + ms + correlation_id

Full detail in agents.md.

5. Mobile offline sync

Full detail in mobile.md. Summary:

Flutter app offline → SQLite event queue (mirrors backend event shape)
  └─ on connectivity: POST /api/v1/sync { events: [...], last_sync_at }
      └─ server-side reconciliation per conflict rules (B07)
          ├─ clock events: client wins (timestamp authoritative)
          ├─ roster changes: server wins (supervisor may have changed)
          ├─ leave requests: server decides (may be approved/rejected)
          └─ time cards: merge with server validation
      └─ return {confirmations, conflicts, server_updates}

Strangler Fig Migration

Migration topology

MySQL (admin_central, admin_atslive, actatek)
 │  (read-only via Finnest.V2Repo + MyXQL)
 │  Oban migration workers stream v2 data
 │  Write to Finnest.Repo (Postgres)
 │  Populate public.id_mappings
PostgreSQL (Finnest primary)
 │  During transition:
 │  - Phase 1-2: Finnest read-copy, v2 source of truth
 │  - Phase 3-6: Finnest becomes source of truth per domain as cutover completes
 │  - Phase 7-8: v2 read-only archive → decommission
Applications query Postgres

Dual Ecto repos

# config/config.exs
config :finnest, ecto_repos: [Finnest.Repo, Finnest.V2Repo]

# Primary repo — Postgres
defmodule Finnest.Repo do
  use Ecto.Repo, otp_app: :finnest_core, adapter: Ecto.Adapters.Postgres
  # ... prepare_query hook ...
end

# Migration read-only repo — MySQL
defmodule Finnest.V2Repo do
  use Ecto.Repo, otp_app: :finnest_core, adapter: Ecto.Adapters.MyXQL, read_only: true

  # No migrations directory
  # Read-only at connection level: user has SELECT only
  # Schemas use @primary_key false if v2 table has no PK
end

# Example read-only schema for v2 data
defmodule Finnest.V2.Candidate do
  use Ecto.Schema
  # DA-02: explicit fields only, no `field :any, :integer` fallback

  @primary_key {:id, :integer, []}
  schema "candidates" do
    field :first_name, :string
    field :last_name, :string
    # ... etc — exactly what we read, nothing more
  end
end

Guarantees (AR-05, DA-01, DA-02):

  • V2Repo connection credentials have SELECT only (no INSERT/UPDATE/DELETE permission at DB level)
  • read_only: true at Ecto level prevents accidental insert/update/delete calls
  • Schemas declare only fields we read — no * queries

Phase order (B11, supersedes B04)

Phase 1 (Weeks 1-4 post go-live):
  [Finnest: Recruit] ──handoff──→ [v2: Onboard, CMS, Roster, T&A, Payroll]

Phase 2 (Weeks 5-8):
  [Finnest: Recruit, Onboard] ──handoff──→ [v2: CMS, Roster, T&A, Payroll]

Phase 3 (Weeks 9-16):
  [Finnest: Recruit, Onboard, CMS, Roster] ──handoff──→ [v2: T&A, Payroll]

Phase 4 (Weeks 17-24):
  [Finnest: + T&A, Reporting] ──handoff──→ [v2: Payroll only]

Phase X (Weeks 25-32):
  [Finnest: Everything + Mobile + Pact] ──→ [v2: read-only archive]

Per-phase migration procedure

Pre-migration:

  1. Data quality audit — run read-only SQL against v2 for orphans, duplicates, encoding issues (B04 audit checklist)
  2. Dry-run — migration workers run with write: false flag; output row-count parity report + statistical diff
  3. Stakeholder sign-off on audit findings and dry-run report

Migration window:

  1. Migration workers queued in Oban
  2. Read from V2Repo, transform, write to primary Repo, populate id_mappings
  3. Per-entity correlation_id propagated so anomalies trace back to source
  4. Progress tracked in migration_runs table (per phase, per batch, per entity type)

Post-migration:

  1. Row-count parity check: v2 count == Finnest active count
  2. Sampling validation: 100 random entities compared field-by-field
  3. Monetary statistical diff: sum of all monetary columns must match to the cent
  4. Handoff switch: app starts reading the migrated domain from Finnest; v2 becomes read-only for that domain
  5. 3-month parallel access window before v2 decommissioned for the domain — users can still query v2 for historical lookups

Rollback

  • Per-phase rollback possible within 24 hours via v2→Finnest difference replay
  • After 24 hours, rollback is event-replay from event store + archived v2 snapshots
  • Documented in runbooks/migration-rollback.md (Phase 3)

Reference Data vs Operational Data

Classification Examples Scope Cross-schema readable?
Operational Employees, shifts, timecards, pay runs, invoices, incidents Per-tenant No — events only (AR-07)
Reference (platform-wide) Credential types, industry profiles, awards, award rates, labour hire licensing rules Platform Yes — documented exception
Reference (tenant-specific) Rate cards, shift templates, roster rules Per-tenant No — events only

The reference-data exception exists because:

  • These are stable facts (a "forklift license" is the same fact everywhere)
  • Read-only from application perspective (only admin UI writes them)
  • Without the exception, every domain would duplicate credential lookups over events — operational pain for no gain
  • Compliance.check/2 already crosses the boundary under AR-18 — so the exception is small

Data Retention Matrix

Category Commercial IRAP Mechanism
Operational data (employees, shifts, etc.) Active + 3 years post-termination Active + 7 years Soft delete (deleted_at); periodic hard-delete-after-retention job
Event store Permanent (partitioned monthly) Same (7 years required, kept permanently for data flywheel) Partition archival; retained partitions accessible via same query
Agent sessions / messages 90 days rolling 7 years Partitioned or rolling cleanup
Agent tool audit 90 days 7 years Same
Audit log entries 3 years 7 years Same as event store
Documents (S3) 3 years post-termination 7 years S3 lifecycle policy (commercial) / Object Lock compliance mode (IRAP)
Backups 30 daily, 12 monthly, 5 yearly 30 daily, 12 monthly, 7 yearly DA-09, IR-10
Logs (CloudWatch) 90 days 7 years CloudWatch retention policy

Capacity Planning (repeated from main for convenience)

Phase 1 (5000 employees target)

Resource Estimate
BEAM node 1 × c6g.xlarge (4 vCPU / 8 GB)
Postgres 1 × db.r6g.large (2 vCPU / 16 GB / 100 GB gp3)
Event store growth ~500K events/month
Index build time ~30 min (initial) / ~2 min per partition (monthly)
Migration window per domain ~2-4 hours for 5K employees
id_mappings size ~5K rows initially, grows with migration phases

Hot query patterns (target p95 latencies)

Query Table Target p95 Notes
Employee list by org people.employees <50ms Index on (org_id, deleted_at)
Today's shifts roster.shifts <50ms Index on (org_id, date_range)
Clock-in insert timekeep.clock_events <100ms Atomic insert + Compliance.check in transaction
Credential check compliance.credential_types via ETS <5ms Hot cache; DB fallback rare
Event store append events.domain_events_<month> <20ms Append-only partition
Agent tool audit insert agents.tool_audit <20ms Separate high-volume partition (future)

Open Questions

# Item Phase Owner
DA-OI-01 agents.tool_audit partitioning strategy — monthly like events, or by org? Consider when volume >10M rows Phase 2 Infra
DA-OI-02 pgvector index when L2/L3 memory embeddings stabilise — ivfflat or hnsw? Phase 3 Agents team
DA-OI-03 Event store archival: move >3-year partitions to S3 Parquet? When? Phase 4 Infra
DA-OI-04 Cross-tenant anonymisation job for L3 memories — pseudonymisation approach Phase 3 Agents team
DA-OI-05 id_mappings retention: kept permanently for audit, or archived after 5 years? Phase 4 Ops

References