Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move examples gateway #992

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions AudioQnA/audioqna.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import asyncio
import os

from comps import AudioQnAGateway, MicroService, ServiceOrchestrator, ServiceType
from comps import Gateway, MegaServiceEndpoint, MicroService, ServiceOrchestrator, ServiceType
from comps.cores.proto.api_protocol import AudioChatCompletionRequest, ChatCompletionResponse
from comps.cores.proto.docarray import LLMParams
from fastapi import Request

MEGA_SERVICE_HOST_IP = os.getenv("MEGA_SERVICE_HOST_IP", "0.0.0.0")
MEGA_SERVICE_PORT = int(os.getenv("MEGA_SERVICE_PORT", 8888))
ASR_SERVICE_HOST_IP = os.getenv("ASR_SERVICE_HOST_IP", "0.0.0.0")
ASR_SERVICE_PORT = int(os.getenv("ASR_SERVICE_PORT", 9099))
Expand All @@ -16,7 +18,7 @@
TTS_SERVICE_PORT = int(os.getenv("TTS_SERVICE_PORT", 9088))


class AudioQnAService:
class AudioQnAService(Gateway):
def __init__(self, host="0.0.0.0", port=8000):
self.host = host
self.port = port
Expand Down Expand Up @@ -50,9 +52,43 @@ def add_remote_service(self):
self.megaservice.add(asr).add(llm).add(tts)
self.megaservice.flow_to(asr, llm)
self.megaservice.flow_to(llm, tts)
self.gateway = AudioQnAGateway(megaservice=self.megaservice, host="0.0.0.0", port=self.port)

async def handle_request(self, request: Request):
data = await request.json()

chat_request = AudioChatCompletionRequest.parse_obj(data)
parameters = LLMParams(
# relatively lower max_tokens for audio conversation
max_tokens=chat_request.max_tokens if chat_request.max_tokens else 128,
top_k=chat_request.top_k if chat_request.top_k else 10,
top_p=chat_request.top_p if chat_request.top_p else 0.95,
temperature=chat_request.temperature if chat_request.temperature else 0.01,
frequency_penalty=chat_request.frequency_penalty if chat_request.frequency_penalty else 0.0,
presence_penalty=chat_request.presence_penalty if chat_request.presence_penalty else 0.0,
repetition_penalty=chat_request.repetition_penalty if chat_request.repetition_penalty else 1.03,
streaming=False, # TODO add streaming LLM output as input to TTS
)
result_dict, runtime_graph = await self.megaservice.schedule(
initial_inputs={"byte_str": chat_request.audio}, llm_parameters=parameters
)

last_node = runtime_graph.all_leaves()[-1]
response = result_dict[last_node]["byte_str"]

return response

def start(self):
super().__init__(
megaservice=self.megaservice,
host=self.host,
port=self.port,
endpoint=str(MegaServiceEndpoint.AUDIO_QNA),
input_datatype=AudioChatCompletionRequest,
output_datatype=ChatCompletionResponse,
)


if __name__ == "__main__":
audioqna = AudioQnAService(host=MEGA_SERVICE_HOST_IP, port=MEGA_SERVICE_PORT)
audioqna = AudioQnAService(port=MEGA_SERVICE_PORT)
audioqna.add_remote_service()
audioqna.start()
83 changes: 76 additions & 7 deletions ChatQnA/chatqna.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@
import os
import re

from comps import ChatQnAGateway, MicroService, ServiceOrchestrator, ServiceType
from comps import Gateway, MegaServiceEndpoint, MicroService, ServiceOrchestrator, ServiceType
from comps.cores.proto.api_protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionResponseChoice,
ChatMessage,
UsageInfo,
)
from comps.cores.proto.docarray import LLMParams, RerankerParms, RetrieverParms
from fastapi import Request
from fastapi.responses import StreamingResponse
from langchain_core.prompts import PromptTemplate


Expand Down Expand Up @@ -35,7 +45,6 @@ def generate_rag_prompt(question, documents):
return template.format(context=context_str, question=question)


MEGA_SERVICE_HOST_IP = os.getenv("MEGA_SERVICE_HOST_IP", "0.0.0.0")
MEGA_SERVICE_PORT = int(os.getenv("MEGA_SERVICE_PORT", 8888))
GUARDRAIL_SERVICE_HOST_IP = os.getenv("GUARDRAIL_SERVICE_HOST_IP", "0.0.0.0")
GUARDRAIL_SERVICE_PORT = int(os.getenv("GUARDRAIL_SERVICE_PORT", 80))
Expand Down Expand Up @@ -173,13 +182,14 @@ def align_generator(self, gen, **kwargs):
yield "data: [DONE]\n\n"


class ChatQnAService:
class ChatQnAService(Gateway):
def __init__(self, host="0.0.0.0", port=8000):
self.host = host
self.port = port
ServiceOrchestrator.align_inputs = align_inputs
ServiceOrchestrator.align_outputs = align_outputs
ServiceOrchestrator.align_generator = align_generator

self.megaservice = ServiceOrchestrator()

def add_remote_service(self):
Expand Down Expand Up @@ -223,7 +233,6 @@ def add_remote_service(self):
self.megaservice.flow_to(embedding, retriever)
self.megaservice.flow_to(retriever, rerank)
self.megaservice.flow_to(rerank, llm)
self.gateway = ChatQnAGateway(megaservice=self.megaservice, host="0.0.0.0", port=self.port)

def add_remote_service_without_rerank(self):

Expand Down Expand Up @@ -256,7 +265,6 @@ def add_remote_service_without_rerank(self):
self.megaservice.add(embedding).add(retriever).add(llm)
self.megaservice.flow_to(embedding, retriever)
self.megaservice.flow_to(retriever, llm)
self.gateway = ChatQnAGateway(megaservice=self.megaservice, host="0.0.0.0", port=self.port)

def add_remote_service_with_guardrails(self):
guardrail_in = MicroService(
Expand Down Expand Up @@ -314,7 +322,66 @@ def add_remote_service_with_guardrails(self):
self.megaservice.flow_to(retriever, rerank)
self.megaservice.flow_to(rerank, llm)
# self.megaservice.flow_to(llm, guardrail_out)
self.gateway = ChatQnAGateway(megaservice=self.megaservice, host="0.0.0.0", port=self.port)

async def handle_request(self, request: Request):
data = await request.json()
stream_opt = data.get("stream", True)
chat_request = ChatCompletionRequest.parse_obj(data)
prompt = self._handle_message(chat_request.messages)
parameters = LLMParams(
max_tokens=chat_request.max_tokens if chat_request.max_tokens else 1024,
top_k=chat_request.top_k if chat_request.top_k else 10,
top_p=chat_request.top_p if chat_request.top_p else 0.95,
temperature=chat_request.temperature if chat_request.temperature else 0.01,
frequency_penalty=chat_request.frequency_penalty if chat_request.frequency_penalty else 0.0,
presence_penalty=chat_request.presence_penalty if chat_request.presence_penalty else 0.0,
repetition_penalty=chat_request.repetition_penalty if chat_request.repetition_penalty else 1.03,
streaming=stream_opt,
chat_template=chat_request.chat_template if chat_request.chat_template else None,
)
retriever_parameters = RetrieverParms(
search_type=chat_request.search_type if chat_request.search_type else "similarity",
k=chat_request.k if chat_request.k else 4,
distance_threshold=chat_request.distance_threshold if chat_request.distance_threshold else None,
fetch_k=chat_request.fetch_k if chat_request.fetch_k else 20,
lambda_mult=chat_request.lambda_mult if chat_request.lambda_mult else 0.5,
score_threshold=chat_request.score_threshold if chat_request.score_threshold else 0.2,
)
reranker_parameters = RerankerParms(
top_n=chat_request.top_n if chat_request.top_n else 1,
)
result_dict, runtime_graph = await self.megaservice.schedule(
initial_inputs={"text": prompt},
llm_parameters=parameters,
retriever_parameters=retriever_parameters,
reranker_parameters=reranker_parameters,
)
for node, response in result_dict.items():
if isinstance(response, StreamingResponse):
return response
last_node = runtime_graph.all_leaves()[-1]
response = result_dict[last_node]["text"]
choices = []
usage = UsageInfo()
choices.append(
ChatCompletionResponseChoice(
index=0,
message=ChatMessage(role="assistant", content=response),
finish_reason="stop",
)
)
return ChatCompletionResponse(model="chatqna", choices=choices, usage=usage)

def start(self):

super().__init__(
megaservice=self.megaservice,
host=self.host,
port=self.port,
endpoint=str(MegaServiceEndpoint.CHAT_QNA),
input_datatype=ChatCompletionRequest,
output_datatype=ChatCompletionResponse,
)


if __name__ == "__main__":
Expand All @@ -324,10 +391,12 @@ def add_remote_service_with_guardrails(self):

args = parser.parse_args()

chatqna = ChatQnAService(host=MEGA_SERVICE_HOST_IP, port=MEGA_SERVICE_PORT)
chatqna = ChatQnAService(port=MEGA_SERVICE_PORT)
if args.without_rerank:
chatqna.add_remote_service_without_rerank()
elif args.with_guardrails:
chatqna.add_remote_service_with_guardrails()
else:
chatqna.add_remote_service()

chatqna.start()
68 changes: 63 additions & 5 deletions CodeGen/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,24 @@
import asyncio
import os

from comps import CodeGenGateway, MicroService, ServiceOrchestrator, ServiceType
from comps import Gateway, MegaServiceEndpoint, MicroService, ServiceOrchestrator, ServiceType
from comps.cores.proto.api_protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionResponseChoice,
ChatMessage,
UsageInfo,
)
from comps.cores.proto.docarray import LLMParams
from fastapi import Request
from fastapi.responses import StreamingResponse

MEGA_SERVICE_HOST_IP = os.getenv("MEGA_SERVICE_HOST_IP", "0.0.0.0")
MEGA_SERVICE_PORT = int(os.getenv("MEGA_SERVICE_PORT", 7778))
LLM_SERVICE_HOST_IP = os.getenv("LLM_SERVICE_HOST_IP", "0.0.0.0")
LLM_SERVICE_PORT = int(os.getenv("LLM_SERVICE_PORT", 9000))


class CodeGenService:
class CodeGenService(Gateway):
def __init__(self, host="0.0.0.0", port=8000):
self.host = host
self.port = port
Expand All @@ -28,9 +37,58 @@ def add_remote_service(self):
service_type=ServiceType.LLM,
)
self.megaservice.add(llm)
self.gateway = CodeGenGateway(megaservice=self.megaservice, host="0.0.0.0", port=self.port)

async def handle_request(self, request: Request):
data = await request.json()
stream_opt = data.get("stream", True)
chat_request = ChatCompletionRequest.parse_obj(data)
prompt = self._handle_message(chat_request.messages)
parameters = LLMParams(
max_tokens=chat_request.max_tokens if chat_request.max_tokens else 1024,
top_k=chat_request.top_k if chat_request.top_k else 10,
top_p=chat_request.top_p if chat_request.top_p else 0.95,
temperature=chat_request.temperature if chat_request.temperature else 0.01,
frequency_penalty=chat_request.frequency_penalty if chat_request.frequency_penalty else 0.0,
presence_penalty=chat_request.presence_penalty if chat_request.presence_penalty else 0.0,
repetition_penalty=chat_request.repetition_penalty if chat_request.repetition_penalty else 1.03,
streaming=stream_opt,
)
result_dict, runtime_graph = await self.megaservice.schedule(
initial_inputs={"query": prompt}, llm_parameters=parameters
)
for node, response in result_dict.items():
# Here it suppose the last microservice in the megaservice is LLM.
if (
isinstance(response, StreamingResponse)
and node == list(self.megaservice.services.keys())[-1]
and self.megaservice.services[node].service_type == ServiceType.LLM
):
return response
last_node = runtime_graph.all_leaves()[-1]
response = result_dict[last_node]["text"]
choices = []
usage = UsageInfo()
choices.append(
ChatCompletionResponseChoice(
index=0,
message=ChatMessage(role="assistant", content=response),
finish_reason="stop",
)
)
return ChatCompletionResponse(model="codegen", choices=choices, usage=usage)

def start(self):
super().__init__(
megaservice=self.megaservice,
host=self.host,
port=self.port,
endpoint=str(MegaServiceEndpoint.CODE_GEN),
input_datatype=ChatCompletionRequest,
output_datatype=ChatCompletionResponse,
)


if __name__ == "__main__":
chatqna = CodeGenService(host=MEGA_SERVICE_HOST_IP, port=MEGA_SERVICE_PORT)
chatqna = CodeGenService(port=MEGA_SERVICE_PORT)
chatqna.add_remote_service()
chatqna.start()
68 changes: 63 additions & 5 deletions CodeTrans/code_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@
import asyncio
import os

from comps import CodeTransGateway, MicroService, ServiceOrchestrator
from comps import Gateway, MegaServiceEndpoint, MicroService, ServiceOrchestrator, ServiceType
from comps.cores.proto.api_protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionResponseChoice,
ChatMessage,
UsageInfo,
)
from fastapi import Request
from fastapi.responses import StreamingResponse

MEGA_SERVICE_HOST_IP = os.getenv("MEGA_SERVICE_HOST_IP", "0.0.0.0")
MEGA_SERVICE_PORT = int(os.getenv("MEGA_SERVICE_PORT", 7777))
LLM_SERVICE_HOST_IP = os.getenv("LLM_SERVICE_HOST_IP", "0.0.0.0")
LLM_SERVICE_PORT = int(os.getenv("LLM_SERVICE_PORT", 9000))


class CodeTransService:
class CodeTransService(Gateway):
def __init__(self, host="0.0.0.0", port=8000):
self.host = host
self.port = port
Expand All @@ -27,9 +35,59 @@ def add_remote_service(self):
use_remote_service=True,
)
self.megaservice.add(llm)
self.gateway = CodeTransGateway(megaservice=self.megaservice, host="0.0.0.0", port=self.port)

async def handle_request(self, request: Request):
data = await request.json()
language_from = data["language_from"]
language_to = data["language_to"]
source_code = data["source_code"]
prompt_template = """
### System: Please translate the following {language_from} codes into {language_to} codes.

### Original codes:
'''{language_from}

{source_code}

'''

### Translated codes:
"""
prompt = prompt_template.format(language_from=language_from, language_to=language_to, source_code=source_code)
result_dict, runtime_graph = await self.megaservice.schedule(initial_inputs={"query": prompt})
for node, response in result_dict.items():
# Here it suppose the last microservice in the megaservice is LLM.
if (
isinstance(response, StreamingResponse)
and node == list(self.megaservice.services.keys())[-1]
and self.megaservice.services[node].service_type == ServiceType.LLM
):
return response
last_node = runtime_graph.all_leaves()[-1]
response = result_dict[last_node]["text"]
choices = []
usage = UsageInfo()
choices.append(
ChatCompletionResponseChoice(
index=0,
message=ChatMessage(role="assistant", content=response),
finish_reason="stop",
)
)
return ChatCompletionResponse(model="codetrans", choices=choices, usage=usage)

def start(self):
super().__init__(
megaservice=self.megaservice,
host=self.host,
port=self.port,
endpoint=str(MegaServiceEndpoint.CODE_TRANS),
input_datatype=ChatCompletionRequest,
output_datatype=ChatCompletionResponse,
)


if __name__ == "__main__":
service_ochestrator = CodeTransService(host=MEGA_SERVICE_HOST_IP, port=MEGA_SERVICE_PORT)
service_ochestrator = CodeTransService(port=MEGA_SERVICE_PORT)
service_ochestrator.add_remote_service()
service_ochestrator.start()
Loading
Loading