并发#
有状态函数支持配置并发度,单个函数实例可处理多个并发。不配置并发度的有状态函数实例默认是单线程的,对于同一个提交者的函数调用,将按顺序处理,但不同提交者间并不能保证执行顺序。您可以使用 InvokeOptions 接口配置有状态函数的并行度,并行度大于 1 时,不保证函数调用的处理顺序。
单实例单线程#
默认有状态函数单实例单线程,同一个提交者的函数调用顺序执行,不同提交者间不保证顺序。
import time
import yr
@yr.instance
class Counter:
def __init__(self):
self.count = 0
def add(self, n):
self.count += n
return self.count
# 模拟一个调用者
@yr.invoke
def caller(instance, n):
return yr.get(instance.add.invoke(n))
@yr.invoke
def delayed_output(n):
time.sleep(1)
return n
yr.init()
instance = Counter.invoke()
# 在远程实例中调用
result0 = caller.invoke(instance, delayed_output.invoke(1))
# 在主程序中调用
result1 = instance.add.invoke(2)
result2 = instance.add.invoke(3)
# 输出 [6, 2, 5]
# result1 和 result2 来自同一个调用者,顺序执行,结果为 2(0+2) 和 5(2+3)
# result0 来自另一个提交者,虽然第一个提交,但因为有延迟,最后一个执行,结果为 6(5+1)
print(yr.get([result0, result1, result2]))
instance.terminate()
yr.finalize()
#include <iostream>
#include <unistd.h>
#include "yr/yr.h"
class Counter {
public:
Counter() : count(0) {}
static Counter *FactoryCreate()
{
return new Counter();
}
int Add(int n)
{
count += n;
return count;
}
YR_STATE(count);
private:
int count;
};
YR_INVOKE(Counter::FactoryCreate, &Counter::Add)
int Caller(YR::NamedInstance<Counter> counter, int n)
{
return *YR::Get(counter.Function(&Counter::Add).Invoke(n));
}
YR_INVOKE(Caller)
int DelayedOutput(int n)
{
sleep(1);
return n;
}
YR_INVOKE(DelayedOutput)
int main(int argc, char *argv[])
{
YR::Init(YR::Config{}, argc, argv);
auto instance = YR::Instance(Counter::FactoryCreate).Invoke();
// 在远程实例中调用
auto n = YR::Function(DelayedOutput).Invoke(1);
auto result0 = YR::Function(Caller).Invoke(instance, n);
// 在主程序中调用
auto result1 = instance.Function(&Counter::Add).Invoke(2);
auto result2 = instance.Function(&Counter::Add).Invoke(3);
// 输出 6:2:5
// result1 和 result2 来自主程序中的调用,顺序执行,结果为 2(0+2) 和 5(2+3)
// result0 来自另一个提交者,虽然第一个调用,但因为有延迟,最后一个执行,结果为 6(5+1)
std::cout << *YR::Get(result0) << ":" << *YR::Get(result1) << ":" << *YR::Get(result2) << std::endl;
instance.Terminate();
YR::Finalize();
return 0;
}
// Counter.java
package com.example;
public class Counter {
private int count;
public Counter() {
this.count = 0;
}
public int add(int n) {
this.count += n;
return this.count;
}
}
// Main.java
package com.example;
import com.yuanrong.Config;
import com.yuanrong.InvokeOptions;
import com.yuanrong.api.YR;
import com.yuanrong.call.InstanceHandler;
import com.yuanrong.exception.YRException;
import com.yuanrong.runtime.client.ObjectRef;
public class Main {
public static class Caller {
public static int delayedOutput(int n) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return n;
}
public static int caller(int n) {
// 复用主程序中的具名函数实例
try {
InstanceHandler counter_exist = YR.instance(Counter::new, "counter-1", "demo").invoke();
ObjectRef refGet = counter_exist.function(Counter::add).invoke(n);
return (int)YR.get(refGet, 9);
} catch (YRException e) {
e.printStackTrace();
}
return n;
}
}
public static void main(String[] args) throws YRException {
YR.init();
// 创建具名函数实例,名称为 counter-1,命名空间为 demo
InstanceHandler counter = YR.instance(Counter::new, "counter-1", "demo").invoke();
// 在远程实例中调用
ObjectRef n = YR.function(Caller::delayedOutput).invoke(1);
ObjectRef result0 = YR.function(Caller::caller).invoke(n);
// 在主程序中调用
ObjectRef result1 = counter.function(Counter::add).invoke(2);
ObjectRef result2 = counter.function(Counter::add).invoke(3);
System.out.println(YR.get(result0, 9) + ":" + YR.get(result1, 9) + ":" + YR.get(result2, 9));
counter.terminate();
YR.Finalize();
}
}
配置单实例多线程并发#
配置单实例多线程支持多并发,此时,不论是同一个提交者还是不同提交者,都不保证执行顺序。
import yr
@yr.instance
class Counter:
def __init__(self):
self.count = 0
def add(self, n):
self.count += n
return self.count
if __name__ == "__main__":
yr.init()
# 配置实例并发度为 2
opt = yr.InvokeOptions()
opt.concurrency = 2
counter = Counter.options(opt).invoke()
result0 = counter.add.invoke(1)
result1 = counter.add.invoke(2)
# 输出可能是 1 3,也可能是 3 2
print(yr.get(result0), yr.get(result1))
counter.terminate()
yr.finalize()
#include <iostream>
#include "yr/yr.h"
class Counter {
public:
Counter() : count(0) {}
static Counter *FactoryCreate()
{
return new Counter();
}
int Add(int n)
{
count += n;
return count;
}
YR_STATE(count);
private:
int count;
};
YR_INVOKE(Counter::FactoryCreate, &Counter::Add)
int main(int argc, char *argv[])
{
YR::Init(YR::Config{}, argc, argv);
// 配置实例并发度为 2
YR::InvokeOptions opt;
opt.customExtensions.insert({"Concurrency", "2"});
auto counter = YR::Instance(Counter::FactoryCreate).Options(opt).Invoke();
auto result0 = counter.Function(&Counter::Add).Invoke(1);
auto result1 = counter.Function(&Counter::Add).Invoke(2);
// 输出可能是 1:3,也可能是 3:2
std::cout << *YR::Get(result0) << ":" << *YR::Get(result1) << std::endl;
counter.Terminate();
YR::Finalize();
return 0;
}
// Counter.java
package com.example;
public class Counter {
private int count;
public Counter() {
this.count = 0;
}
public int add(int n) {
this.count += n;
return this.count;
}
}
// Main.java
package com.example;
import com.yuanrong.Config;
import com.yuanrong.InvokeOptions;
import com.yuanrong.api.YR;
import com.yuanrong.call.InstanceHandler;
import com.yuanrong.exception.YRException;
import com.yuanrong.runtime.client.ObjectRef;
import java.util.HashMap;
public class Main {
public static void main(String[] args) throws YRException {
YR.init();
// 配置实例并发度为 2
HashMap<String, String> customExtensions = new HashMap<>();
customExtensions.put("Concurrency", "2");
InvokeOptions opt = new InvokeOptions();
opt.setCustomExtensions(customExtensions);
InstanceHandler counter = YR.instance(Counter::new).options(opt).invoke();
ObjectRef result0 = counter.function(Counter::add).invoke(1);
ObjectRef result1 = counter.function(Counter::add).invoke(2);
// 输出可能是 1:3,也可能是 3:2
System.out.println(YR.get(result0, 9) + ":" + YR.get(result1, 9));
counter.terminate();
YR.Finalize();
}
}