import logging
import os
from dotenv import load_dotenv
from livekit.plugins import openai, deepgram, silero
from livekit.plugins.turn_detector.english import EnglishModel
from livekit.agents import (
JobContext,
WorkerOptions,
cli,
Agent,
AgentSession,
ChatContext,
ChatMessage,
RunContext,
function_tool,
)
# Moss imports
from moss import MossClient, DocumentInfo, QueryOptions
load_dotenv()
# Configuration
MOSS_PROJECT_ID = os.getenv("MOSS_PROJECT_ID")
MOSS_PROJECT_KEY = os.getenv("MOSS_PROJECT_KEY")
KNOWLEDGE_INDEX = os.getenv("MOSS_INDEX_NAME", "product-knowledge")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("moss-agent")
class MossSemanticRetrievalAgent(Agent):
def __init__(self, moss_client: MossClient, moss_session):
super().__init__(
instructions="""
You are a helpful customer support voice assistant.
When you need facts about products or policies, call
search_knowledge_base. To recall something said earlier in this
call, call search_conversation. If the tools return nothing
useful, say you don't know.
"""
)
self.moss = moss_client
self.moss_session = moss_session # short-term, per-call SessionIndex
self._turn = 0
@function_tool
async def search_knowledge_base(self, context: RunContext, query: str) -> str:
"""Search the product and support knowledge base.
Args:
query: A focused query describing the facts to look up.
"""
results = await self.moss.query(
KNOWLEDGE_INDEX, query, QueryOptions(top_k=5, alpha=0.8)
)
if not results.docs:
return "No relevant entries found."
return "\n".join(f"- {d.text}" for d in results.docs)
@function_tool
async def search_conversation(self, context: RunContext, query: str) -> str:
"""Recall something said earlier in this same call.
Args:
query: What to look for in the conversation so far.
"""
results = await self.moss_session.query(query, QueryOptions(top_k=3))
if not results.docs:
return "Nothing relevant was said earlier in this call."
return "\n".join(f"- {d.text}" for d in results.docs)
async def on_user_turn_completed(self, turn_ctx: ChatContext, new_message: ChatMessage) -> None:
# Record each turn in the session (local, ~1-5 ms) so it can be recalled
# later via search_conversation and persisted at call end. This only
# writes to the session; it does not inject anything into the prompt.
self._turn += 1
try:
await self.moss_session.add_docs(
[DocumentInfo(id=f"user-turn-{self._turn}", text=new_message.text_content)]
)
except Exception as e:
logger.error(f"Failed to index turn: {e}")
await super().on_user_turn_completed(turn_ctx, new_message)
async def entrypoint(ctx: JobContext):
await ctx.connect()
# Initialize Moss
moss_client = MossClient(project_id=MOSS_PROJECT_ID, project_key=MOSS_PROJECT_KEY)
# Long-term context: load the persistent knowledge base for in-process queries.
try:
await moss_client.load_index(KNOWLEDGE_INDEX)
logger.info(f"Loaded knowledge index: {KNOWLEDGE_INDEX}")
except Exception as e:
logger.warning(f"Knowledge index not loaded: {e}. Run build_index.py first.")
# Short-term context: open a session keyed to this call. It auto-loads if a
# cloud index with this name already exists (an earlier handoff), or starts
# empty for a brand-new call.
call_id = f"call-{ctx.room.name}"
moss_session = await moss_client.session(index_name=call_id)
logger.info(f"Opened session '{call_id}' ({moss_session.doc_count} docs loaded)")
# When the call ends, push the session to the cloud so the conversation can
# be resumed later or handed off to another agent.
async def persist_session():
try:
result = await moss_session.push_index()
logger.info(f"Pushed session '{call_id}': {result.doc_count} docs")
except Exception as e:
logger.error(f"Failed to push session: {e}")
ctx.add_shutdown_callback(persist_session)
# Create the LiveKit voice pipeline.
session = AgentSession(
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o"),
tts=openai.TTS(),
turn_detection=EnglishModel(),
vad=silero.VAD.load(),
)
# Start the session with our custom MossSemanticRetrievalAgent.
await session.start(
agent=MossSemanticRetrievalAgent(moss_client, moss_session),
room=ctx.room,
)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))