diff --git a/examples/multimodal/README.md b/examples/multimodal/README.md new file mode 100644 index 00000000..8b48f5ba --- /dev/null +++ b/examples/multimodal/README.md @@ -0,0 +1,25 @@ +# Multimodal with Fishjam and Gemini Live API + +This example shows how to integrate Fishjam with the Gemini Live API for multimodal (audio + video) interactions. +It periodically captures images from video tracks and sends them alongside audio to Gemini. + +## Development + +To start the development server you must first copy `.env.example` to `.env`. + +Then you need to set the following variables: + +- `FISHJAM_ID`: your Fishjam ID, which you can get at +- `FISHJAM_TOKEN`: your Fishjam management token, which you can get at +- `GEMINI_API_KEY`: your Gemini API key, which you can get at + +Once you've set up your environment variables, all you need to do is run the following command: + +```bash +yarn dev +``` + +When the server is running, you can obtain peer tokens by going to . + +When you connect peers with audio and video, the agent will periodically capture video frames and send them along with audio to Gemini for multimodal understanding. +You can connect peers with the [fishjam minimal-react example](https://github.com/fishjam-cloud/web-client-sdk/tree/main/examples/react-client). diff --git a/examples/multimodal/package.json b/examples/multimodal/package.json new file mode 100644 index 00000000..c07231ca --- /dev/null +++ b/examples/multimodal/package.json @@ -0,0 +1,24 @@ +{ + "name": "multimodal-demo", + "version": "0.24.0", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1", + "format": "prettier --write .", + "typecheck": "tsc", + "dev": "bun run --watch src/index.ts" + }, + "dependencies": { + "@fishjam-cloud/js-server-sdk": "workspace:*", + "@google/genai": "^1.13.0", + "@grotto/logysia": "^0.1.6", + "bun": "^1.2.20", + "elysia": "latest" + }, + "devDependencies": { + "@types/bun": "^1", + "bun-types": "latest", + "prettier": "^3.6.2", + "typescript": "^5.9.3" + }, + "module": "src/index.js" +} diff --git a/examples/multimodal/src/const.ts b/examples/multimodal/src/const.ts new file mode 100644 index 00000000..2bb950fa --- /dev/null +++ b/examples/multimodal/src/const.ts @@ -0,0 +1,3 @@ +export const MULTIMODAL_MODEL = 'gemini-2.5-flash-native-audio-preview-12-2025'; + +export const CAPTURE_INTERVAL_MS = 3000; diff --git a/examples/multimodal/src/controllers/peers.ts b/examples/multimodal/src/controllers/peers.ts new file mode 100644 index 00000000..cee9f9e0 --- /dev/null +++ b/examples/multimodal/src/controllers/peers.ts @@ -0,0 +1,8 @@ +import { Elysia } from 'elysia'; +import { FishjamService } from '../service/fishjam'; + +export const peerController = (fishjam: FishjamService) => + new Elysia().get('/peers', async () => { + const { peer: _peer, peerToken } = await fishjam.createPeer(); + return { token: peerToken }; + }); diff --git a/examples/multimodal/src/environment.d.ts b/examples/multimodal/src/environment.d.ts new file mode 100644 index 00000000..527ff29f --- /dev/null +++ b/examples/multimodal/src/environment.d.ts @@ -0,0 +1,11 @@ +declare global { + namespace NodeJS { + interface ProcessEnv { + FISHJAM_ID: string; + FISHJAM_TOKEN?: string; + GEMINI_API_KEY?: string; + } + } +} + +export {}; diff --git a/examples/multimodal/src/index.ts b/examples/multimodal/src/index.ts new file mode 100644 index 00000000..ca76b0a2 --- /dev/null +++ b/examples/multimodal/src/index.ts @@ -0,0 +1,21 @@ +import { Elysia } from 'elysia'; +import { peerController } from './controllers/peers'; +import { FishjamService } from './service/fishjam'; +import { MultimodalService } from './service/multimodal'; + +if (!process.env.FISHJAM_ID || !process.env.FISHJAM_TOKEN || !process.env.GEMINI_API_KEY) { + throw Error('Environment variables FISHJAM_ID, FISHJAM_TOKEN and GEMINI_API_KEY are required.'); +} + +const fishjamConfig = { + fishjamId: process.env.FISHJAM_ID, + managementToken: process.env.FISHJAM_TOKEN, +}; + +const fishjam = new FishjamService(fishjamConfig); + +new MultimodalService(fishjamConfig, process.env.GEMINI_API_KEY); + +const app = new Elysia().use(peerController(fishjam)).listen(3000); + +console.log(`Elysia is running at ${app.server?.hostname}:${app.server?.port}`); diff --git a/examples/multimodal/src/service/fishjam.ts b/examples/multimodal/src/service/fishjam.ts new file mode 100644 index 00000000..0790b22a --- /dev/null +++ b/examples/multimodal/src/service/fishjam.ts @@ -0,0 +1,32 @@ +import { FishjamClient, FishjamConfig, RoomId, RoomNotFoundException } from '@fishjam-cloud/js-server-sdk'; + +export class FishjamService { + roomId?: RoomId; + fishjam: FishjamClient; + + constructor(config: FishjamConfig) { + this.fishjam = new FishjamClient(config); + } + + async createPeer() { + try { + return await this.makePeer(); + } catch (e) { + if (e instanceof RoomNotFoundException) { + await this.makeRoom(); + return this.makePeer(); + } + throw e; + } + } + + private async makeRoom() { + const { id: roomId } = await this.fishjam.createRoom(); + this.roomId = roomId; + } + + private async makePeer() { + if (!this.roomId) await this.makeRoom(); + return this.fishjam.createPeer(this.roomId!); + } +} diff --git a/examples/multimodal/src/service/multimodal.ts b/examples/multimodal/src/service/multimodal.ts new file mode 100644 index 00000000..33cf6763 --- /dev/null +++ b/examples/multimodal/src/service/multimodal.ts @@ -0,0 +1,227 @@ +import { + FishjamAgent, + FishjamConfig, + FishjamWSNotifier, + FishjamClient, + PeerConnected, + PeerDisconnected, + PeerId, + RoomId, + TrackAdded, + TrackRemoved, + TrackId, + IncomingTrackData, + IncomingTrackImage, +} from '@fishjam-cloud/js-server-sdk'; +import GeminiIntegration from '@fishjam-cloud/js-server-sdk/gemini'; +import { GoogleGenAI, LiveServerMessage, Modality, Session } from '@google/genai'; +import { MULTIMODAL_MODEL, CAPTURE_INTERVAL_MS } from '../const'; + +type AgentState = { + agent: FishjamAgent; + outputTrackId: TrackId; +}; + +export class MultimodalService { + peerSessions: Map = new Map(); + agents: Map = new Map(); + videoTracks: Map> = new Map(); + captureIntervals: Map> = new Map(); + ai: GoogleGenAI; + fishjamConfig: FishjamConfig; + fishjamClient: FishjamClient; + + constructor(fishjamConfig: FishjamConfig, geminiKey: string) { + this.ai = GeminiIntegration.createClient({ apiKey: geminiKey }); + this.fishjamConfig = fishjamConfig; + this.fishjamClient = new FishjamClient(fishjamConfig); + this.initFishjam(); + } + + private initFishjam() { + const notifier = new FishjamWSNotifier( + this.fishjamConfig, + (error) => console.error('Fishjam websocket error: %O', error), + (code, reason) => console.log(`Fishjam websocket closed. code: ${code}, reason: ${reason}`) + ); + + notifier.on('peerConnected', (msg) => this.handlePeerConnected(msg)); + notifier.on('peerDisconnected', (msg) => this.handlePeerDisconnected(msg)); + notifier.on('trackAdded', (msg) => this.handleTrackAdded(msg)); + notifier.on('trackRemoved', (msg) => this.handleTrackRemoved(msg)); + } + + async handlePeerConnected(message: PeerConnected) { + if (message.peerType === 2) return; + + console.log('Peer connected: %O', message); + + const peerId = message.peerId; + const agentState = this.agents.get(message.roomId); + + if (agentState && peerId === (agentState as { agent: FishjamAgent }).agent.constructor.name) return; + + if (agentState == undefined) { + const { + peer: { id: newAgentId }, + agent, + } = await this.fishjamClient.createAgent( + message.roomId, + { output: GeminiIntegration.geminiInputAudioSettings }, + { + onClose: (code, reason) => console.log(`Fishjam agent websocket closed. code: ${code}, reason: ${reason}`), + onError: (error) => console.error('Fishjam agent websocket error: %O', error), + } + ); + + const outputTrack = agent.createTrack(GeminiIntegration.geminiOutputAudioSettings); + + this.agents.set(message.roomId, { agent, outputTrackId: outputTrack.id }); + this.videoTracks.set(message.roomId, new Set()); + + agent.on('trackData', (msg) => this.handleTrackData(msg)); + agent.on('trackImage', (msg) => this.handleTrackImage(message.roomId, msg)); + + this.startImageCapture(message.roomId); + + console.log(`Agent ${newAgentId} created`); + } + + const session = await this.ai.live.connect({ + model: MULTIMODAL_MODEL, + config: { + responseModalities: [Modality.AUDIO], + }, + callbacks: { + onopen: () => console.log(`Connected to Gemini Live API for peer ${peerId}.`), + onerror: (error) => console.error(`Gemini error for peer ${peerId}: %O`, error), + onclose: (e) => + console.log(`Connection to Gemini Live API for peer ${peerId} closed. code: ${e.code}, reason: ${e.reason}`), + onmessage: (msg) => this.handleGeminiMessage(message.roomId, peerId, msg), + }, + }); + this.peerSessions.set(peerId, session); + } + + async handlePeerDisconnected(message: PeerDisconnected) { + const agentState = this.agents.get(message.roomId); + if (agentState) { + // Check if the disconnecting peer is the agent itself + const room = await this.fishjamClient.getRoom(message.roomId); + const isAgent = room.peers.every((peer) => peer.id !== message.peerId); + if (isAgent) return this.handleAgentDisconnected(message); + } + + this.handleWebrtcPeerDisconnected(message); + } + + handleAgentDisconnected(message: PeerDisconnected) { + console.log(`Agent ${message.peerId} disconnected`); + + this.stopImageCapture(message.roomId); + this.agents.delete(message.roomId); + this.videoTracks.delete(message.roomId); + } + + async handleWebrtcPeerDisconnected(message: PeerDisconnected) { + console.log('Peer disconnected: %O', message); + + const peerId = message.peerId; + const session = this.peerSessions.get(peerId); + session?.close(); + this.peerSessions.delete(peerId); + + const room = await this.fishjamClient.getRoom(message.roomId); + const activePeers = room.peers.filter((peer) => peer.status === 'connected'); + if (activePeers.length === 1) { + console.log('Last peer left room, removing agent'); + this.stopImageCapture(message.roomId); + await this.fishjamClient.deletePeer(message.roomId, activePeers[0].id); + } + } + + handleTrackAdded(message: TrackAdded) { + if (!message.track || message.track.type !== 1) return; + + const trackId = message.track.id as TrackId; + const tracks = this.videoTracks.get(message.roomId); + if (tracks) { + tracks.add(trackId); + console.log(`Video track ${trackId} added in room ${message.roomId}`); + } + } + + handleTrackRemoved(message: TrackRemoved) { + if (!message.track) return; + + const trackId = message.track.id as TrackId; + const tracks = this.videoTracks.get(message.roomId); + if (tracks) { + tracks.delete(trackId); + console.log(`Video track ${trackId} removed from room ${message.roomId}`); + } + } + + handleTrackData(message: IncomingTrackData) { + const { data, peerId } = message; + const session = this.peerSessions.get(peerId); + + session?.sendRealtimeInput({ + audio: { + data: data.toBase64(), + mimeType: GeminiIntegration.inputMimeType, + }, + }); + } + + handleTrackImage(roomId: RoomId, message: IncomingTrackImage) { + const { contentType, data } = message; + + for (const [peerId, session] of this.peerSessions) { + session.sendRealtimeInput({ + media: { + data: Buffer.from(data).toString('base64'), + mimeType: contentType, + }, + }); + } + } + + handleGeminiMessage(roomId: RoomId, peerId: PeerId, msg: LiveServerMessage) { + const agentState = this.agents.get(roomId); + if (!agentState) return; + + const audioData = msg.serverContent?.modelTurn?.parts?.[0]?.inlineData; + if (audioData?.data) { + const buffer = Buffer.from(audioData.data, 'base64'); + agentState.agent.sendData(agentState.outputTrackId, new Uint8Array(buffer)); + } + + const transcription = msg.serverContent?.inputTranscription?.text; + if (transcription) console.log(`Peer ${peerId} said: "${transcription}".`); + } + + private startImageCapture(roomId: RoomId) { + const interval = setInterval(() => { + const agentState = this.agents.get(roomId); + const tracks = this.videoTracks.get(roomId); + + if (!agentState || !tracks || tracks.size === 0) return; + + for (const trackId of tracks) { + console.log('Sending image capture request for track', trackId); + agentState.agent.captureImage(trackId); + } + }, CAPTURE_INTERVAL_MS); + + this.captureIntervals.set(roomId, interval); + } + + private stopImageCapture(roomId: RoomId) { + const interval = this.captureIntervals.get(roomId); + if (interval) { + clearInterval(interval); + this.captureIntervals.delete(roomId); + } + } +} diff --git a/examples/multimodal/tsconfig.json b/examples/multimodal/tsconfig.json new file mode 100644 index 00000000..d14f04c0 --- /dev/null +++ b/examples/multimodal/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2021", + "module": "ES2022", + "moduleResolution": "bundler", + "types": ["bun-types"], + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "strict": true, + "skipLibCheck": true, + "noEmit": true, + "jsx": "preserve" + } +} diff --git a/packages/fishjam-proto/protos b/packages/fishjam-proto/protos index 129e3774..9d807b55 160000 --- a/packages/fishjam-proto/protos +++ b/packages/fishjam-proto/protos @@ -1 +1 @@ -Subproject commit 129e3774291fa899d781b5ee804a42146fbe8a78 +Subproject commit 9d807b55279de385136f82b12f5df75d73104514 diff --git a/packages/js-server-sdk/src/agent.ts b/packages/js-server-sdk/src/agent.ts index 56503913..6584a0f7 100644 --- a/packages/js-server-sdk/src/agent.ts +++ b/packages/js-server-sdk/src/agent.ts @@ -7,6 +7,7 @@ import { AgentRequest_TrackData, AgentResponse, AgentResponse_TrackData, + AgentResponse_TrackImage, Track as ProtoTrack, TrackType as ProtoTrackType, TrackEncoding, @@ -14,13 +15,14 @@ import { import { AgentCallbacks, Brand, FishjamConfig, PeerId } from './types'; import { getFishjamUrl, httpToWebsocket, WithPeerId } from './utils'; -const expectedEventsList = ['trackData'] as const; +const expectedEventsList = ['trackData', 'trackImage'] as const; /** * @useDeclaredType */ export type ExpectedAgentEvents = (typeof expectedEventsList)[number]; export type IncomingTrackData = Omit, 'peerId'> & { peerId: PeerId }; +export type IncomingTrackImage = NonNullable; export type OutgoingTrackData = Omit, 'peerId'> & { peerId: PeerId }; export type AgentTrack = Omit & { id: TrackId }; @@ -124,6 +126,15 @@ export class FishjamAgent extends (EventEmitter as new () => TypedEmitter