aboutsummaryrefslogtreecommitdiff
path: root/engine/internal.go
diff options
context:
space:
mode:
Diffstat (limited to 'engine/internal.go')
-rw-r--r--engine/internal.go165
1 files changed, 165 insertions, 0 deletions
diff --git a/engine/internal.go b/engine/internal.go
new file mode 100644
index 0000000..367807d
--- /dev/null
+++ b/engine/internal.go
@@ -0,0 +1,165 @@
+package engine
+
+import (
+ "fmt"
+ "log"
+ "net"
+ "runtime"
+ "strconv"
+ "time"
+)
+
+type Transport interface {
+ Write(data string, addr string, port int)
+}
+
+type udpTransport struct {
+ conn *net.UDPConn
+}
+
+func (p *udpTransport) setConn(conn *net.UDPConn) {
+ p.conn = conn
+}
+
+func (p *udpTransport) Write(data string, addr string, port int) {
+ laddr, err := net.ResolveUDPAddr("udp", addr+":"+strconv.Itoa(port))
+ if err != nil {
+ fmt.Println("resolve addr err: ", err)
+ return
+ }
+ _, er := p.conn.WriteTo([]byte(data), laddr)
+ if er != nil {
+ fmt.Println("resolve addr err: ", err)
+ return
+ }
+}
+
+type _reactor struct {
+ udp_listeners map[int]UdpClient
+ udp_conn map[int]*net.UDPConn
+ timer []*LaterCalling
+}
+
+func (p *_reactor) ListenUdp(addaport string, udp UdpClient) {
+ laddr, err := net.ResolveUDPAddr("udp", addaport)
+ if err == nil {
+ p.listenUdp(laddr, udp)
+ } else {
+ log.Println("resolve addr err: ", err)
+ return
+ }
+}
+
+type UdpClient interface {
+ DatagramReceived(data []byte, addr net.Addr)
+ SetUdpTransport(Transport)
+}
+
+func (p *_reactor) listenUdp(addr *net.UDPAddr, udp UdpClient) {
+ p.initReactor()
+ p.udp_listeners[addr.Port] = udp
+ log.Println("ntp server listening on (UDP)", addr.String())
+ c, erl := net.ListenUDP("udp", addr)
+ if erl != nil {
+ log.Fatal("listen error: ", erl)
+ } else {
+ p.udp_conn[addr.Port] = c
+ }
+ transport := new(udpTransport)
+ transport.setConn(c)
+ udp.SetUdpTransport(transport)
+}
+
+func (p *_reactor) initReactor() {
+ if p.udp_listeners == nil {
+ p.udp_listeners = make(map[int]UdpClient)
+ }
+ if p.udp_conn == nil {
+ p.udp_conn = make(map[int]*net.UDPConn)
+ }
+}
+
+var (
+ Reactor = new(_reactor)
+ listening_chan chan int
+)
+
+type LaterCalling struct {
+ millisecond int
+ call func()
+}
+
+type reactor interface {
+ ListenUdp(port int, client UdpClient)
+ CallLater(microsecond int, latercaller func())
+ Run()
+}
+
+func (p *_reactor) CallLater(millisecond int, lc func()) {
+ calling := new(LaterCalling)
+ calling.millisecond = millisecond
+ calling.call = lc
+ p.timer = append(p.timer, calling)
+}
+
+func (p *_reactor) Run() {
+ runtime.GOMAXPROCS(len(p.udp_conn))
+ for port, l := range p.udp_conn {
+ go handleUdpConnection(l, p.udp_listeners[port])
+ }
+ for len(p.timer) > 0 {
+ caller := p.timer[0]
+ p.timer = p.timer[1:]
+ selectTimer(caller)
+ }
+ for {
+ fmt.Println("------------------ Logs and Errors ------------------")
+ select {
+ case <-listening_chan:
+ fmt.Println("-----------------------------------------------------")
+
+ }
+ }
+}
+
+func selectTimer(caller *LaterCalling) {
+ select {
+ case <-time.After(time.Duration(caller.millisecond) * time.Millisecond):
+ caller.call()
+ }
+}
+
+func handleUdpConnection(conn *net.UDPConn, client UdpClient) {
+ for {
+ data := make([]byte, 512)
+ read_length, remoteAddr, err := conn.ReadFromUDP(data[0:])
+ if err != nil {
+ return
+ } else {
+ }
+ if read_length > 0 {
+ go panicWrapping(func() {
+ client.DatagramReceived(data[0:read_length], remoteAddr)
+ })
+ }
+ }
+}
+
+func panicWrapping(f func()) {
+ defer func() {
+ recover()
+ }()
+ f()
+}
+
+type UdpHandler struct {
+ udptransport Transport
+}
+
+func (p *UdpHandler) SetUdpTransport(transport Transport) {
+ p.udptransport = transport
+}
+
+func (p *UdpHandler) UdpWrite(data string, addr string, port int) {
+ p.udptransport.Write(data, addr, port)
+}

Snix LLC Git Repository Holder Copyright(C) 2022 All Rights Reserved Email To Snix.IR