Skip to content
4 changes: 3 additions & 1 deletion flexus_client_kit/ckit_erp.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def check_record_matches_filter(record: dict, f: str, col_names: set = None) ->
Check if a single record matches a single filter string.
Filter format: "col:op:val" or "col:op"

Standard operators: =, !=, >, >=, <, <=, IN, NOT_IN, LIKE, ILIKE, IS_NULL, IS_NOT_NULL, IS_EMPTY, IS_NOT_EMPTY
Standard operators: =, !=, >, >=, <, <=, IN, NOT_IN, LIKE, ILIKE, CIEQL, IS_NULL, IS_NOT_NULL, IS_EMPTY, IS_NOT_EMPTY
Array operators: contains, not_contains
JSON path: "task_details->email_subtype:=:welcome"
"""
Expand Down Expand Up @@ -295,6 +295,8 @@ def check_record_matches_filter(record: dict, f: str, col_names: set = None) ->
if op in ("NOT_IN", "NOT IN"):
vals = [v.strip() for v in filter_val.split(",")]
return str(val) not in vals
if op == "CIEQL":
return str(val).lower() == filter_val.lower()
if op in ("LIKE", "ILIKE"):
s = str(val).lower() if op == "ILIKE" else str(val)
pattern = filter_val.lower() if op == "ILIKE" else filter_val
Expand Down
4 changes: 2 additions & 2 deletions flexus_client_kit/integrations/fi_erp.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
name="erp_table_data",
description=(
"Query ERP table data with filtering. "
"Operators: =, !=, >, >=, <, <=, LIKE, ILIKE, IN, NOT_IN, IS_NULL, IS_NOT_NULL. "
"LIKE/ILIKE use SQL wildcards: % matches any chars. "
"Operators: =, !=, >, >=, <, <=, LIKE, ILIKE, CIEQL, IN, NOT_IN, IS_NULL, IS_NOT_NULL. "
"LIKE/ILIKE use SQL wildcards: % matches any chars. CIEQL: Case Insensitive Equal. "
"JSON path: details->subtype:=:welcome. "
"Examples: "
'filters="status:=:active" for single filter, '
Expand Down
2 changes: 1 addition & 1 deletion flexus_client_kit/integrations/fi_gmail.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ async def _create_activity_for_email(self, to: str, subject: str, body: str, ft_
try:
contacts = await ckit_erp.query_erp_table(
self.fclient, "crm_contact", self.rcx.persona.ws_id, erp_schema.CrmContact,
filters=f"contact_email:ILIKE:{email}", limit=1,
filters=f"contact_email:CIEQL:{email}", limit=1,
)
if not contacts:
continue
Expand Down
201 changes: 201 additions & 0 deletions flexus_client_kit/integrations/fi_resend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import json
import logging
import os
from dataclasses import dataclass
from typing import Dict, Any, List, Optional

import gql
import httpx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


from flexus_client_kit import ckit_bot_exec, ckit_bot_query, ckit_client, ckit_cloudtool

logger = logging.getLogger("resend")

RESEND_API_KEY = os.environ.get("RESEND_API_KEY", "")
RESEND_TESTING_DOMAIN = os.environ.get("RESEND_TESTING_DOMAIN", "")
RESEND_BASE = "https://api.resend.com"

RESEND_SETUP_SCHEMA = [
{
"bs_name": "DOMAINS",
"bs_type": "string_multiline",
"bs_default": "{}",
"bs_group": "Email",
"bs_importance": 0,
"bs_description": 'Registered domains, e.g. {"mail.example.com": "d_abc123"}. Send and receive emails from these domains. Incoming emails are logged as CRM activities.',
},
]

RESEND_PROMPT = f"""## Email

Use email_send() to send emails. Use email_setup_domain() to register and manage sending domains, call email_setup_domain(op="help") first.
Users can configure EMAIL_RESPOND_TO addresses — emails to those addresses are handled as tasks, all others are logged as CRM activities.
Strongly recommend using a subdomain (e.g. mail.example.com) instead of the main domain, especially for inbound emails.
If no domain is configured, send from *@{RESEND_TESTING_DOMAIN} for testing.
Never use flexus_my_setup() for email domains — they are saved automatically via email_setup_domain() tool."""

RESEND_SEND_TOOL = ckit_cloudtool.CloudTool(
strict=True,
name="email_send",
description="Send an email. Provide html and/or text body.",
parameters={
"type": "object",
"properties": {
"from": {"type": "string", "order": 1, "description": "Sender, e.g. Name <noreply@domain.com>"},
"to": {"type": "array", "items": {"type": "string"}, "order": 2, "description": "Recipient email addresses"},
"subject": {"type": "string", "order": 3, "description": "Subject line"},
"html": {"type": "string", "order": 4, "description": "HTML body, or empty string if text-only"},
"text": {"type": "string", "order": 5, "description": "Plain text fallback, or empty string if html-only"},
"cc": {"type": "array", "items": {"type": "string"}, "order": 6, "description": "CC recipient email addresses"},
"bcc": {"type": "array", "items": {"type": "string"}, "order": 7, "description": "BCC recipient email addresses"},
"reply_to": {"type": "string", "order": 8, "description": "Reply-to address, or empty string"},
},
"required": ["from", "to", "subject", "html", "text", "cc", "bcc", "reply_to"],
"additionalProperties": False,
},
)

RESEND_SETUP_TOOL = ckit_cloudtool.CloudTool(
strict=False,
name="email_setup_domain",
description="Manage email domains: add, verify, check status, list, delete. Call with op=\"help\" for usage. Before adding a domain, ask the user if they want to enable receiving emails on it.",
parameters={
"type": "object",
"properties": {
"op": {"type": "string", "description": "Operation: help, add, verify, status, list, delete"},
"args": {"type": "object"},
},
"required": [],
},
)

SETUP_HELP = """Email domain setup:

email_setup_domain(op="add", args={"domain": "yourdomain.com", "region": "us-east-1", "enable_receiving": true})
Register a domain or update an existing one. Returns DNS records to configure.
If the domain already exists, updates its settings (receiving, etc.).
Ask the user which region they prefer and whether they want to enable receiving emails.
Regions: us-east-1, eu-west-1, sa-east-1, ap-northeast-1.

email_setup_domain(op="verify", args={"domain_id": "..."})
Trigger verification after adding DNS records. May take a few minutes.

email_setup_domain(op="status", args={"domain_id": "..."})
Check verification status and DNS records.

email_setup_domain(op="list")
List all registered domains and their verification status.

email_setup_domain(op="delete", args={"domain_id": "..."})
Remove a domain.
"""


@dataclass
class ActivityEmail:
email_id: str
from_addr: str
from_full: str # "Name <email>" if available
to_addrs: List[str]
cc_addrs: List[str]
bcc_addrs: List[str]
subject: str
body_text: str
body_html: str


def _setup_help(has_domains: bool) -> str:
if not has_domains and RESEND_TESTING_DOMAIN:
return SETUP_HELP + f"No domains configured yet. Send from @{RESEND_TESTING_DOMAIN} in the meantime.\n"
return SETUP_HELP


async def _check_dns_txt(domain: str, expected: str) -> bool:
try:
async with httpx.AsyncClient(timeout=5) as c:
r = await c.get(f"https://dns.google/resolve?name={domain}&type=TXT")
return any(expected in a.get("data", "") for a in r.json().get("Answer", []))
except Exception as e:
logger.warning("DNS TXT check failed for %s: %s", domain, e)
return False


def parse_emessage(emsg: ckit_bot_query.FExternalMessageOutput) -> ActivityEmail:
payload = emsg.emsg_payload if isinstance(emsg.emsg_payload, dict) else json.loads(emsg.emsg_payload)
content = payload.get("email_content", {})
data = payload.get("data", {})
header_from = content.get("headers", {}).get("from", "")
return ActivityEmail(
email_id=data.get("email_id", emsg.emsg_external_id),
from_addr=emsg.emsg_from or data.get("from", ""),
from_full=header_from or data.get("from", "") or emsg.emsg_from,
to_addrs=data.get("to", []),
cc_addrs=data.get("cc", []),
bcc_addrs=data.get("bcc", []),
subject=content.get("subject", data.get("subject", "")),
body_text=content.get("text", ""),
body_html=content.get("html", ""),
)


class IntegrationResend:

def __init__(self, fclient: ckit_client.FlexusClient, rcx: ckit_bot_exec.RobotContext, domains: Dict[str, str], emails_to_register: set):
self.fclient = fclient
self.rcx = rcx
self.domains = domains # {"domain.com": "resend_domain_id"}
self.emails_to_register = emails_to_register

async def send_called_by_model(self, toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Optional[Dict[str, Any]]):
if not model_produced_args:
return "Provide from, to, subject, and html or text body."
a = model_produced_args
frm, to = a.get("from", ""), a.get("to", [])
if not frm or not to:
return "Missing required: 'from' and 'to'"
if not a.get("html", "") and not a.get("text", ""):
return "Provide 'html' and/or 'text'"
http = await self.fclient.use_http()
async with http as h:
r = await h.execute(gql.gql("""mutation ResendBotSendEmail($input: ResendEmailSendInput!) {
resend_email_send(input: $input)
}"""), variable_values={"input": {
"persona_id": self.rcx.persona.persona_id,
"email_from": frm,
"email_to": to,
"email_subject": a.get("subject", ""),
"email_html": a.get("html", ""),
"email_text": a.get("text", ""),
"email_cc": a.get("cc", []),
"email_bcc": a.get("bcc", []),
"email_reply_to": a.get("reply_to", ""),
}})
rid = r.get("resend_email_send", "")
logger.info("sent email %s to %s", rid, to)
return ckit_cloudtool.ToolResult(content=f"Email sent (id: {rid})", dollars=0)

async def setup_called_by_model(self, toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Optional[Dict[str, Any]]):
if not model_produced_args:
return _setup_help(bool(self.domains))
op = model_produced_args.get("op", "")
args, args_error = ckit_cloudtool.sanitize_args(model_produced_args)
if args_error:
return args_error
if not op or "help" in op:
return _setup_help(bool(self.domains))
gql_input = {"persona_id": self.rcx.persona.persona_id, "op": op}
for k in ("domain", "domain_id", "region"):
if v := ckit_cloudtool.try_best_to_find_argument(args, model_produced_args, k, None):
gql_input[k] = v
if ckit_cloudtool.try_best_to_find_argument(args, model_produced_args, "enable_receiving", False):
gql_input["enable_receiving"] = True
if op == "add" and not gql_input.get("domain"):
return "domain is required for add"
if op in ("verify", "status", "delete") and not gql_input.get("domain_id"):
return "domain_id is required for " + op
http = await self.fclient.use_http()
async with http as h:
r = await h.execute(gql.gql("""mutation ResendBotSetupDomain($input: ResendSetupDomainInput!) {
resend_setup_domain(input: $input)
}"""), variable_values={"input": gql_input})
return r.get("resend_setup_domain", "")
67 changes: 61 additions & 6 deletions flexus_simple_bots/vix/vix_bot.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import email.utils
import json
import logging
import time
from dataclasses import asdict
from typing import Dict, Any

Expand All @@ -12,12 +14,14 @@
from flexus_client_kit import ckit_shutdown
from flexus_client_kit import ckit_ask_model
from flexus_client_kit import ckit_mongo
from flexus_client_kit import ckit_erp
from flexus_client_kit import ckit_kanban
from flexus_client_kit import erp_schema
from flexus_client_kit.integrations import fi_mongo_store
from flexus_client_kit.integrations import fi_pdoc
from flexus_client_kit.integrations import fi_erp
from flexus_client_kit.integrations import fi_gmail
from flexus_client_kit.integrations import fi_crm_automations
from flexus_client_kit.integrations import fi_resend
from flexus_client_kit.integrations import fi_telegram
from flexus_simple_bots.vix import vix_install
from flexus_simple_bots.version_common import SIMPLE_BOTS_COMMON_VERSION
Expand All @@ -36,8 +40,9 @@
fi_erp.ERP_TABLE_DATA_TOOL,
fi_erp.ERP_TABLE_CRUD_TOOL,
fi_erp.ERP_CSV_IMPORT_TOOL,
fi_gmail.GMAIL_TOOL,
fi_crm_automations.CRM_AUTOMATION_TOOL,
fi_resend.RESEND_SEND_TOOL,
fi_resend.RESEND_SETUP_TOOL,
fi_telegram.TELEGRAM_TOOL,
]

Expand All @@ -54,10 +59,12 @@ def get_setup():

pdoc_integration = fi_pdoc.IntegrationPdoc(rcx, rcx.persona.ws_root_group_id)
erp_integration = fi_erp.IntegrationErp(fclient, rcx.persona.ws_id, personal_mongo)
gmail_integration = fi_gmail.IntegrationGmail(fclient, rcx)
automations_integration = fi_crm_automations.IntegrationCrmAutomations(
fclient, rcx, get_setup, available_erp_tables=ERP_TABLES,
)
resend_domains = (rcx.persona.persona_setup or {}).get("DOMAINS", {})
email_respond_to = set(a.strip().lower() for a in get_setup().get("EMAIL_RESPOND_TO", "").split(",") if a.strip())
resend_integration = fi_resend.IntegrationResend(fclient, rcx, resend_domains, email_respond_to)
telegram = fi_telegram.IntegrationTelegram(fclient, rcx, get_setup().get("TELEGRAM_BOT_TOKEN", ""))
await telegram.register_webhook_and_start()

Expand All @@ -73,6 +80,50 @@ async def updated_thread_in_db(th: ckit_ask_model.FThreadOutput):
async def updated_task_in_db(t: ckit_kanban.FPersonaKanbanTaskOutput):
pass

@rcx.on_emessage("EMAIL")
async def handle_email(emsg):
em = fi_resend.parse_emessage(emsg)
body = em.body_text or em.body_html or "(empty)"
try:
display_name, addr = email.utils.parseaddr(em.from_full)
addr = addr or em.from_addr
contacts = await ckit_erp.query_erp_table(
fclient, "crm_contact", rcx.persona.ws_id, erp_schema.CrmContact,
filters=f"contact_email:CIEQL:{addr}", limit=1,
)
if contacts:
contact_id = contacts[0].contact_id
else:
parts = display_name.split(None, 1) if display_name else [addr.split("@")[0]]
contact_id = await ckit_erp.create_erp_record(fclient, "crm_contact", rcx.persona.ws_id, {
"ws_id": rcx.persona.ws_id,
"contact_email": addr.lower(),
"contact_first_name": parts[0],
"contact_last_name": parts[1] if len(parts) > 1 else "(unknown)",
})
await ckit_erp.create_erp_record(fclient, "crm_activity", rcx.persona.ws_id, {
"ws_id": rcx.persona.ws_id,
"activity_title": em.subject,
"activity_type": "EMAIL",
"activity_direction": "INBOUND",
"activity_platform": "RESEND",
"activity_contact_id": contact_id,
"activity_summary": body[:500],
"activity_occurred_ts": time.time(),
})
except Exception:
logger.exception("Failed to create CRM activity for inbound email from %s", em.from_addr)
if not email_respond_to.intersection(a.lower() for a in em.to_addrs):
return
title = "Email from %s: %s" % (em.from_addr, em.subject)
if em.cc_addrs:
title += " (cc: %s)" % ", ".join(em.cc_addrs)
await ckit_kanban.bot_kanban_post_into_inbox(
fclient, rcx.persona.persona_id,
title=title, details_json=json.dumps({"from": em.from_addr, "to": em.to_addrs, "cc": em.cc_addrs, "subject": em.subject, "body": body[:2000]}),
provenance_message="vix_email_inbound",
)

@rcx.on_emessage("TELEGRAM")
async def handle_telegram_emessage(emsg):
await telegram.handle_emessage(emsg)
Expand Down Expand Up @@ -106,9 +157,13 @@ async def toolcall_erp_crud(toolcall: ckit_cloudtool.FCloudtoolCall, model_produ
async def toolcall_erp_csv_import(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str:
return await erp_integration.handle_csv_import(toolcall, model_produced_args)

@rcx.on_tool_call(fi_gmail.GMAIL_TOOL.name)
async def toolcall_gmail(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str:
return await gmail_integration.called_by_model(toolcall, model_produced_args)
@rcx.on_tool_call(fi_resend.RESEND_SEND_TOOL.name)
async def toolcall_email_send(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str:
return await resend_integration.send_called_by_model(toolcall, model_produced_args)

@rcx.on_tool_call(fi_resend.RESEND_SETUP_TOOL.name)
async def toolcall_email_setup(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str:
return await resend_integration.setup_called_by_model(toolcall, model_produced_args)

@rcx.on_tool_call(fi_crm_automations.CRM_AUTOMATION_TOOL.name)
async def toolcall_crm_automation(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str:
Expand Down
Loading