提交 0f2ea291 作者: glide-the

调整项目结构,适配远程LLM调用生成问题。新增fastchat_openai_llm.py实现fastchat openai报文报文形式调用

上级 f5a85a19
...@@ -69,11 +69,11 @@ llm_model_dict = { ...@@ -69,11 +69,11 @@ llm_model_dict = {
"local_model_path": None, "local_model_path": None,
"provides": "LLamaLLM" "provides": "LLamaLLM"
}, },
"fastChat": { "fastChatOpenAI": {
"name": "fastChat", "name": "FastChatOpenAI",
"pretrained_model_name": "fastChat", "pretrained_model_name": "FastChatOpenAI",
"local_model_path": None, "local_model_path": None,
"provides": "FastChatLLM" "provides": "FastChatOpenAILLM"
} }
} }
......
from .chatglm_llm import ChatGLM from .chatglm_llm import ChatGLM
from .llama_llm import LLamaLLM from .llama_llm import LLamaLLM
from .moss_llm import MOSSLLM from .moss_llm import MOSSLLM
from .fastchat_llm import FastChatLLM from .fastchat_openai_llm import FastChatOpenAILLM
...@@ -2,8 +2,12 @@ from models.base.base import ( ...@@ -2,8 +2,12 @@ from models.base.base import (
AnswerResult, AnswerResult,
BaseAnswer BaseAnswer
) )
from models.base.remote_rpc_model import (
RemoteRpcModel
)
__all__ = [ __all__ = [
"AnswerResult", "AnswerResult",
"BaseAnswer", "BaseAnswer",
"RemoteRpcModel",
] ]
from abc import ABC, abstractmethod
import torch
from models.base import (BaseAnswer,
AnswerResult)
class MultimodalAnswerResult(AnswerResult):
image: str = None
class LavisBlip2Multimodal(BaseAnswer, ABC):
@property
@abstractmethod
def _blip2_instruct(self) -> any:
"""Return _blip2_instruct of blip2."""
@property
@abstractmethod
def _image_blip2_vis_processors(self) -> dict:
"""Return _image_blip2_vis_processors of blip2 image processors."""
@abstractmethod
def set_image_path(self, image_path: str):
"""set set_image_path"""
from abc import ABC, abstractmethod
import torch
from models.base import (BaseAnswer,
AnswerResult)
class MultimodalAnswerResult(AnswerResult):
image: str = None
class RemoteRpcModel(BaseAnswer, ABC):
@property
@abstractmethod
def _api_key(self) -> str:
"""Return _api_key of client."""
@property
@abstractmethod
def _api_base_url(self) -> str:
"""Return _api_base of client host bash url."""
@abstractmethod
def set_api_key(self, api_key: str):
"""set set_api_key"""
@abstractmethod
def set_api_base_url(self, api_base_url: str):
"""set api_base_url"""
@abstractmethod
def call_model_name(self, model_name):
"""call model name of client"""
"""Wrapper around FastChat APIs."""
from __future__ import annotations
import logging
import sys
import warnings
from abc import ABC
from typing import (
AbstractSet,
Any,
Callable,
Collection,
Dict,
Generator,
List,
Literal,
Mapping,
Optional,
Set,
Tuple,
Union,
)
from pydantic import Extra, Field, root_validator
from tenacity import (
before_sleep_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from langchain.llms.base import BaseLLM
from langchain.schema import Generation, LLMResult
from langchain.utils import get_from_dict_or_env
from models.base import (RemoteRpcModel,
AnswerResult)
from models.loader import LoaderCheckPoint
import requests
import json
logger = logging.getLogger(__name__)
def _streaming_response_template() -> Dict[str, Any]:
"""
:return: 响应结构
"""
return {
"text": "",
"error_code": 0,
}
def _update_response(response: Dict[str, Any], stream_response: Dict[str, Any]) -> None:
"""Update response from the stream response."""
response["text"] += stream_response["text"]
response["error_code"] += stream_response["error_code"]
class BaseFastChat(BaseLLM):
"""Wrapper around FastChat large language models."""
api_base_url: str = "http://localhost:21002/worker_generate_stream"
model_name: str = "text-davinci-003"
"""Model name to use."""
temperature: float = 0.7
"""What sampling temperature to use."""
max_new_tokens: int = 200
stop: int = 20
batch_size: int = 20
"""Maximum number of retries to make when generating."""
streaming: bool = False
"""Penalizes repeated tokens."""
n: int = 1
"""Whether to stream the results or not."""
allowed_special: Union[Literal["all"], AbstractSet[str]] = set()
"""Set of special tokens that are allowed。"""
disallowed_special: Union[Literal["all"], Collection[str]] = "all"
"""Set of special tokens that are not allowed。"""
class Config:
"""Configuration for this pydantic object."""
extra = Extra.ignore
@root_validator(pre=True)
def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Build extra kwargs from additional params that were passed in."""
all_required_field_names = {field.alias for field in cls.__fields__.values()}
extra = values.get("model_kwargs", {})
for field_name in list(values):
if field_name not in all_required_field_names:
if field_name in extra:
raise ValueError(f"Found {field_name} supplied twice.")
logger.warning(
f"""WARNING! {field_name} is not default parameter.
{field_name} was transfered to model_kwargs.
Please confirm that {field_name} is what you intended."""
)
extra[field_name] = values.pop(field_name)
values["model_kwargs"] = extra
return values
@property
def _default_params(self) -> Dict[str, Any]:
"""Get the default parameters for calling FastChat API."""
normal_params = {
"model": self.model_name,
"prompt": '',
"max_new_tokens": self.max_new_tokens,
"temperature": self.temperature,
}
return {**normal_params}
def _generate(
self, prompts: List[str], stop: Optional[List[str]] = None
) -> LLMResult:
"""Call out to FastChat's endpoint with k unique prompts.
Args:
prompts: The prompts to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The full LLM output.
Example:
.. code-block:: python
response = fastchat.generate(["Tell me a joke."])
"""
# TODO: write a unit test for this
params = self._invocation_params
sub_prompts = self.get_sub_prompts(params, prompts)
choices = []
token_usage: Dict[str, int] = {}
headers = {"User-Agent": "fastchat Client"}
for _prompts in sub_prompts:
params["prompt"] = _prompts[0]
if stop is not None:
if "stop" in params:
raise ValueError("`stop` found in both the input and default params.")
params["stop"] = stop
if self.streaming:
if len(_prompts) > 1:
raise ValueError("Cannot stream results with multiple prompts.")
response_template = _streaming_response_template()
response = requests.post(
self.api_base_url,
headers=headers,
json=params,
stream=True,
)
for stream_resp in response.iter_lines(
chunk_size=8192, decode_unicode=False, delimiter=b"\0"
):
if stream_resp:
data = json.loads(stream_resp.decode("utf-8"))
skip_echo_len = len(_prompts[0])
output = data["text"][skip_echo_len:].strip()
data["text"] = output
self.callback_manager.on_llm_new_token(
output,
verbose=self.verbose,
logprobs=data["error_code"],
)
_update_response(response_template, data)
choices.append(response_template)
else:
response_template = _streaming_response_template()
response = requests.post(
self.api_base_url,
headers=headers,
json=params,
stream=True,
)
for stream_resp in response.iter_lines(
chunk_size=8192, decode_unicode=False, delimiter=b"\0"
):
if stream_resp:
data = json.loads(stream_resp.decode("utf-8"))
skip_echo_len = len(_prompts[0])
output = data["text"][skip_echo_len:].strip()
data["text"] = output
_update_response(response_template, data)
choices.append(response_template)
return self.create_llm_result(choices, prompts, token_usage)
async def _agenerate(
self, prompts: List[str], stop: Optional[List[str]] = None
) -> LLMResult:
"""Call out to FastChat's endpoint async with k unique prompts."""
params = self._invocation_params
sub_prompts = self.get_sub_prompts(params, prompts)
choices = []
token_usage: Dict[str, int] = {}
headers = {"User-Agent": "fastchat Client"}
for _prompts in sub_prompts:
params["prompt"] = _prompts[0]
if stop is not None:
if "stop" in params:
raise ValueError("`stop` found in both the input and default params.")
params["stop"] = stop
if self.streaming:
if len(_prompts) > 1:
raise ValueError("Cannot stream results with multiple prompts.")
response_template = _streaming_response_template()
response = requests.post(
self.api_base_url,
headers=headers,
json=params,
stream=True,
)
for stream_resp in response.iter_lines(
chunk_size=8192, decode_unicode=False, delimiter=b"\0"
):
if stream_resp:
data = json.loads(stream_resp.decode("utf-8"))
skip_echo_len = len(_prompts[0])
output = data["text"][skip_echo_len:].strip()
data["text"] = output
self.callback_manager.on_llm_new_token(
output,
verbose=self.verbose,
logprobs=data["error_code"],
)
_update_response(response_template, data)
choices.append(response_template)
else:
response_template = _streaming_response_template()
response = requests.post(
self.api_base_url,
headers=headers,
json=params,
stream=True,
)
for stream_resp in response.iter_lines(
chunk_size=8192, decode_unicode=False, delimiter=b"\0"
):
if stream_resp:
data = json.loads(stream_resp.decode("utf-8"))
skip_echo_len = len(_prompts[0])
output = data["text"][skip_echo_len:].strip()
data["text"] = output
_update_response(response_template, data)
choices.append(response_template)
return self.create_llm_result(choices, prompts, token_usage)
def get_sub_prompts(
self,
params: Dict[str, Any],
prompts: List[str],
) -> List[List[str]]:
"""Get the sub prompts for llm call."""
if params["max_new_tokens"] == -1:
if len(prompts) != 1:
raise ValueError(
"max_new_tokens set to -1 not supported for multiple inputs."
)
params["max_new_tokens"] = self.max_new_tokens_for_prompt(prompts[0])
# append pload
sub_prompts = [
prompts[i: i + self.batch_size]
for i in range(0, len(prompts), self.batch_size)
]
return sub_prompts
def create_llm_result(
self, choices: Any, prompts: List[str], token_usage: Dict[str, int]
) -> LLMResult:
"""Create the LLMResult from the choices and prompts."""
generations = []
for i, _ in enumerate(prompts):
sub_choices = choices[i * self.n: (i + 1) * self.n]
generations.append(
[
Generation(
text=choice["text"],
generation_info=dict(
finish_reason='over',
logprobs=choice["text"],
),
)
for choice in sub_choices
]
)
llm_output = {"token_usage": token_usage, "model_name": self.model_name}
return LLMResult(generations=generations, llm_output=llm_output)
def stream(self, prompt: str, stop: Optional[List[str]] = None) -> Generator:
"""Call FastChat with streaming flag and return the resulting generator.
BETA: this is a beta feature while we figure out the right abstraction.
Once that happens, this interface could change.
Args:
prompt: The prompts to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
A generator representing the stream of tokens from OpenAI.
Example:
.. code-block:: python
generator = fastChat.stream("Tell me a joke.")
for token in generator:
yield token
"""
params = self._invocation_params
params["prompt"] = prompt
if stop is not None:
if "stop" in params:
raise ValueError("`stop` found in both the input and default params.")
params["stop"] = stop
headers = {"User-Agent": "fastchat Client"}
response = requests.post(
self.api_base_url,
headers=headers,
json=params,
stream=True,
)
for stream_resp in response.iter_lines(
chunk_size=8192, decode_unicode=False, delimiter=b"\0"
):
if stream_resp:
data = json.loads(stream_resp.decode("utf-8"))
skip_echo_len = len(prompt)
output = data["text"][skip_echo_len:].strip()
data["text"] = output
yield data
@property
def _invocation_params(self) -> Dict[str, Any]:
"""Get the parameters used to invoke the model."""
return self._default_params
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {**{"model_name": self.model_name}, **self._default_params}
@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "fastChat"
def get_num_tokens(self, text: str) -> int:
"""Calculate num tokens with tiktoken package."""
# tiktoken NOT supported for Python < 3.8
if sys.version_info[1] < 8:
return super().get_num_tokens(text)
try:
import tiktoken
except ImportError:
raise ValueError(
"Could not import tiktoken python package. "
"This is needed in order to calculate get_num_tokens. "
"Please install it with `pip install tiktoken`."
)
enc = tiktoken.encoding_for_model(self.model_name)
tokenized_text = enc.encode(
text,
allowed_special=self.allowed_special,
disallowed_special=self.disallowed_special,
)
# calculate the number of tokens in the encoded text
return len(tokenized_text)
def modelname_to_contextsize(self, modelname: str) -> int:
"""Calculate the maximum number of tokens possible to generate for a model.
Args:
modelname: The modelname we want to know the context size for.
Returns:
The maximum context size
Example:
.. code-block:: python
max_new_tokens = openai.modelname_to_contextsize("text-davinci-003")
"""
model_token_mapping = {
"vicuna-13b": 2049,
"koala": 2049,
"dolly-v2": 2049,
"oasst": 2049,
"stablelm": 2049,
}
context_size = model_token_mapping.get(modelname, None)
if context_size is None:
raise ValueError(
f"Unknown model: {modelname}. Please provide a valid OpenAI model name."
"Known models are: " + ", ".join(model_token_mapping.keys())
)
return context_size
def max_new_tokens_for_prompt(self, prompt: str) -> int:
"""Calculate the maximum number of tokens possible to generate for a prompt.
Args:
prompt: The prompt to pass into the model.
Returns:
The maximum number of tokens to generate for a prompt.
Example:
.. code-block:: python
max_new_tokens = openai.max_token_for_prompt("Tell me a joke.")
"""
num_tokens = self.get_num_tokens(prompt)
# get max context size for model by name
max_size = self.modelname_to_contextsize(self.model_name)
return max_size - num_tokens
class FastChatAPILLM(RemoteRpcModel, BaseFastChat, ABC):
"""Wrapper around FastChat large language models.
Example:
.. code-block:: python
openai = FastChat(model_name="vicuna")
"""
checkPoint: LoaderCheckPoint = None
history_len: int = 10
def __init__(self, checkPoint: LoaderCheckPoint = None):
super().__init__()
self.checkPoint = checkPoint
@property
def _invocation_params(self) -> Dict[str, Any]:
return {**{"model": self.model_name}, **super()._invocation_params}
@property
def _check_point(self) -> LoaderCheckPoint:
return self.checkPoint
@property
def _history_len(self) -> int:
return self.history_len
def set_history_len(self, history_len: int = 10) -> None:
self.history_len = history_len
@property
def _api_key(self) -> str:
pass
@property
def _api_base_url(self) -> str:
return self.api_base_url
def set_api_key(self, api_key: str):
pass
def set_api_base_url(self, api_base_url: str):
self.api_base_url = api_base_url
def call_model_name(self, model_name):
self.model_name = model_name
def generatorAnswer(self, prompt: str,
history: List[List[str]] = [],
streaming: bool = False):
generator = self.stream("Tell me a joke.")
for token in generator:
yield token
history += [[prompt, token["text"]]]
answer_result = AnswerResult()
answer_result.history = history
answer_result.llm_output = {"answer": token["text"]}
yield answer_result
from abc import ABC
import requests
from typing import Optional, List
from langchain.llms.base import LLM
from models.loader import LoaderCheckPoint
from models.base import (BaseAnswer,
AnswerResult)
class FastChatLLM(BaseAnswer, LLM, ABC):
max_token: int = 10000
temperature: float = 0.01
top_p = 0.9
checkPoint: LoaderCheckPoint = None
# history = []
history_len: int = 10
def __init__(self, checkPoint: LoaderCheckPoint = None):
super().__init__()
self.checkPoint = checkPoint
@property
def _llm_type(self) -> str:
return "FastChat"
@property
def _check_point(self) -> LoaderCheckPoint:
return self.checkPoint
@property
def _history_len(self) -> int:
return self.history_len
def set_history_len(self, history_len: int = 10) -> None:
self.history_len = history_len
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
pass
def generatorAnswer(self, prompt: str,
history: List[List[str]] = [],
streaming: bool = False):
response = "fastchat 响应结果"
history += [[prompt, response]]
answer_result = AnswerResult()
answer_result.history = history
answer_result.llm_output = {"answer": response}
yield answer_result
from abc import ABC
import requests
from typing import Optional, List
from langchain.llms.base import LLM
from models.loader import LoaderCheckPoint
from models.base import (RemoteRpcModel,
AnswerResult)
from typing import (
Collection,
Dict
)
def _build_message_template() -> Dict[str, str]:
"""
:return: 结构
"""
return {
"role": "",
"content": "",
}
class FastChatOpenAILLM(RemoteRpcModel, LLM, ABC):
api_base_url: str = "http://localhost:8000/v1"
model_name: str = "chatglm-6b"
max_token: int = 10000
temperature: float = 0.01
top_p = 0.9
checkPoint: LoaderCheckPoint = None
history = []
history_len: int = 10
def __init__(self, checkPoint: LoaderCheckPoint = None):
super().__init__()
self.checkPoint = checkPoint
@property
def _llm_type(self) -> str:
return "FastChat"
@property
def _check_point(self) -> LoaderCheckPoint:
return self.checkPoint
@property
def _history_len(self) -> int:
return self.history_len
def set_history_len(self, history_len: int = 10) -> None:
self.history_len = history_len
@property
def _api_key(self) -> str:
pass
@property
def _api_base_url(self) -> str:
return self.api_base_url
def set_api_key(self, api_key: str):
pass
def set_api_base_url(self, api_base_url: str):
self.api_base_url = api_base_url
def call_model_name(self, model_name):
self.model_name = model_name
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
pass
# 将历史对话数组转换为文本格式
def build_message_list(self, query) -> Collection[Dict[str, str]]:
build_message_list: Collection[Dict[str, str]] = []
history = self.history[-self.history_len:] if self.history_len > 0 else []
for i, (old_query, response) in enumerate(history):
user_build_message = _build_message_template()
user_build_message['role'] = 'user'
user_build_message['content'] = old_query
system_build_message = _build_message_template()
system_build_message['role'] = 'system'
system_build_message['content'] = response
build_message_list.append(user_build_message)
build_message_list.append(system_build_message)
user_build_message = _build_message_template()
user_build_message['role'] = 'user'
user_build_message['content'] = query
build_message_list.append(user_build_message)
return build_message_list
def generatorAnswer(self, prompt: str,
history: List[List[str]] = [],
streaming: bool = False):
try:
import openai
# Not support yet
openai.api_key = "EMPTY"
openai.api_base = self.api_base_url
except ImportError:
raise ValueError(
"Could not import openai python package. "
"Please install it with `pip install openai`."
)
# create a chat completion
completion = openai.ChatCompletion.create(
model=self.model_name,
messages=self.build_message_list(prompt)
)
self.history += [[prompt, completion.choices[0].message.content]]
answer_result = AnswerResult()
answer_result.history = self.history
answer_result.llm_output = {"answer": completion.choices[0].message.content}
yield answer_result
import sys import sys
from typing import Any
from models.loader.args import parser from models.loader.args import parser
from models.loader import LoaderCheckPoint from models.loader import LoaderCheckPoint
from configs.model_config import (llm_model_dict, LLM_MODEL) from configs.model_config import (llm_model_dict, LLM_MODEL)
...@@ -8,7 +8,7 @@ from models.base import BaseAnswer ...@@ -8,7 +8,7 @@ from models.base import BaseAnswer
loaderCheckPoint: LoaderCheckPoint = None loaderCheckPoint: LoaderCheckPoint = None
def loaderLLM(llm_model: str = None, no_remote_model: bool = False, use_ptuning_v2: bool = False) -> BaseAnswer: def loaderLLM(llm_model: str = None, no_remote_model: bool = False, use_ptuning_v2: bool = False) -> Any:
""" """
init llm_model_ins LLM init llm_model_ins LLM
:param llm_model: model_name :param llm_model: model_name
...@@ -34,7 +34,7 @@ def loaderLLM(llm_model: str = None, no_remote_model: bool = False, use_ptuning_ ...@@ -34,7 +34,7 @@ def loaderLLM(llm_model: str = None, no_remote_model: bool = False, use_ptuning_
loaderCheckPoint.model_path = llm_model_info["local_model_path"] loaderCheckPoint.model_path = llm_model_info["local_model_path"]
if 'fastChat' in loaderCheckPoint.model_name: if 'FastChat' in loaderCheckPoint.model_name:
loaderCheckPoint.unload_model() loaderCheckPoint.unload_model()
else: else:
loaderCheckPoint.reload_model() loaderCheckPoint.reload_model()
......
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../../')
import asyncio
from argparse import Namespace
from models.loader.args import parser
from models.loader import LoaderCheckPoint
import models.shared as shared
async def dispatch(args: Namespace):
args_dict = vars(args)
shared.loaderCheckPoint = LoaderCheckPoint(args_dict)
llm_model_ins = shared.loaderLLM()
llm_model_ins.set_api_base_url("http://localhost:8000/v1")
llm_model_ins.call_model_name("chatglm-6b")
history = [
("which city is this?", "tokyo"),
("why?", "she's japanese"),
]
for answer_result in llm_model_ins.generatorAnswer(prompt="她在做什么? ", history=history,
streaming=False):
resp = answer_result.llm_output["answer"]
print(resp)
if __name__ == '__main__':
args = None
args = parser.parse_args(args=['--model-dir', '/media/checkpoint/', '--model', 'fastChatOpenAI', '--no-remote-model'])
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(dispatch(args))
import sys import sys
import os import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../') sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../../')
import asyncio import asyncio
from argparse import Namespace from argparse import Namespace
from models.loader.args import parser from models.loader.args import parser
from models.loader import LoaderCheckPoint from models.loader import LoaderCheckPoint
from langchain.agents import initialize_agent, Tool
from langchain.agents import AgentType
import models.shared as shared import models.shared as shared
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论